You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/03 22:59:28 UTC
[3/4] beam git commit: Add a Test for windowed
CombineGloballyAsSingletonView
Add a Test for windowed CombineGloballyAsSingletonView
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de44a34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de44a34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de44a34
Branch: refs/heads/master
Commit: 3de44a348e3e0934c644c718255a43b8f42a3534
Parents: 079966c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 11:24:14 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/CombineTest.java | 46 ++++++++++++++++++++
1 file changed, 46 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3de44a34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 5b18384..6c62d0b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
@@ -75,8 +76,10 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -627,6 +630,49 @@ public class CombineTest implements Serializable {
}
@Test
+ @Category(RunnableOnService.class)
+ public void testWindowedCombineGloballyAsSingletonView() {
+ FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
+ final PCollectionView<Integer> view =
+ pipeline
+ .apply(
+ "CreateSideInput",
+ Create.timestamped(
+ TimestampedValue.of(1, new Instant(100)),
+ TimestampedValue.of(3, new Instant(100))))
+ .apply("WindowSideInput", Window.<Integer>into(windowFn))
+ .apply("CombineSideInput", Sum.integersGlobally().asSingletonView());
+
+ TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new Instant(100));
+ TimestampedValue<Void> emptyElement = TimestampedValue.atMinimumTimestamp(null);
+ PCollection<Integer> output =
+ pipeline
+ .apply(
+ "CreateMainInput",
+ Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
+ .apply("WindowMainInput", Window.<Void>into(windowFn))
+ .apply(
+ "OutputSideInput",
+ ParDo.of(
+ new DoFn<Void, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.sideInput(view));
+ }
+ })
+ .withSideInputs(view));
+
+ PAssert.that(output).containsInAnyOrder(4, 0);
+ PAssert.that(output)
+ .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp()))
+ .containsInAnyOrder(4);
+ PAssert.that(output)
+ .inWindow(windowFn.assignWindow(emptyElement.getTimestamp()))
+ .containsInAnyOrder(0);
+ pipeline.run();
+ }
+
+ @Test
public void testCombineGetName() {
assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName());
assertEquals(