You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:23 UTC

[21/50] beam git commit: Add a Combine Test for Sliding Windows without Context

Add a Combine Test for Sliding Windows without Context


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ade8426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ade8426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ade8426

Branch: refs/heads/gearpump-runner
Commit: 6ade8426edc2ace1a9bec8f9501d8dad17e91365
Parents: 1e16aa2
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 28 12:51:31 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 28 12:52:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 63 ++++++++++++++++++++
 1 file changed, 63 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6ade8426/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index e2469ab..b24d82d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -325,6 +327,67 @@ public class CombineTest implements Serializable {
 
   @Test
   @Category(ValidatesRunner.class)
+  public void testSlidingWindowsCombine() {
+    PCollection<String> input =
+        pipeline
+            .apply(
+                Create.timestamped(
+                    TimestampedValue.of("a", new Instant(1L)),
+                    TimestampedValue.of("b", new Instant(2L)),
+                    TimestampedValue.of("c", new Instant(3L))))
+            .apply(
+                Window.<String>into(
+                    SlidingWindows.of(Duration.millis(3)).every(Duration.millis(1L))));
+    PCollection<List<String>> combined =
+        input.apply(
+            Combine.globally(
+                    new CombineFn<String, List<String>, List<String>>() {
+                      @Override
+                      public List<String> createAccumulator() {
+                        return new ArrayList<>();
+                      }
+
+                      @Override
+                      public List<String> addInput(List<String> accumulator, String input) {
+                        accumulator.add(input);
+                        return accumulator;
+                      }
+
+                      @Override
+                      public List<String> mergeAccumulators(Iterable<List<String>> accumulators) {
+                        // Mutate all of the accumulators. Instances should be used in only one
+                        // place, and not
+                        // reused after merging.
+                        List<String> cur = createAccumulator();
+                        for (List<String> accumulator : accumulators) {
+                          accumulator.addAll(cur);
+                          cur = accumulator;
+                        }
+                        return cur;
+                      }
+
+                      @Override
+                      public List<String> extractOutput(List<String> accumulator) {
+                        List<String> result = new ArrayList<>(accumulator);
+                        Collections.sort(result);
+                        return result;
+                      }
+                    })
+                .withoutDefaults());
+
+    PAssert.that(combined)
+        .containsInAnyOrder(
+            ImmutableList.of("a"),
+            ImmutableList.of("a", "b"),
+            ImmutableList.of("a", "b", "c"),
+            ImmutableList.of("b", "c"),
+            ImmutableList.of("c"));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
   public void testSlidingWindowsCombineWithContext() {
     // [a: 1, 1], [a: 4; b: 1], [b: 13]
     PCollection<KV<String, Integer>> perKeyInput =