You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:10 UTC

[3/5] incubator-beam git commit: Update test for GatherAllPanes

Update test for GatherAllPanes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec1bb3a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec1bb3a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec1bb3a6

Branch: refs/heads/master
Commit: ec1bb3a62cb69fc221ed0370679fb91ffafebf66
Parents: a1365bb
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 20 14:39:54 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:46:48 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GatherAllPanesTest.java    | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec1bb3a6/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
index e9be41e..a6522ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.google.common.collect.Iterables;
@@ -98,8 +100,12 @@ public class GatherAllPanesTest implements Serializable {
   public void multiplePanesMultipleReifiedPane() {
     TestPipeline p = TestPipeline.create();
 
+    PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000));
+    PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000));
     PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes =
-        p.apply(CountingInput.upTo(20000))
+        PCollectionList.of(someElems)
+            .and(otherElems)
+            .apply(Flatten.<Long>pCollections())
             .apply(
                 WithTimestamps.of(
                     new SerializableFunction<Long, Instant>() {