You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:28 UTC

[32/50] [abbrv] beam git commit: Add GroupByKey tests for Multiple & Merging windows

Add GroupByKey tests for Multiple & Merging windows

This gives explicit coverage to a GroupByKey where the elements are in
multiple windows, or in merging windows.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e947045
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e947045
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e947045

Branch: refs/heads/DSL_SQL
Commit: 1e947045a54bd59b449fd56f8f5f50879b6d9c4c
Parents: be5b934
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jul 17 13:38:11 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jul 18 17:52:55 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/GroupByKeyTest.java     | 156 +++++++++++++++----
 1 file changed, 122 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1e947045/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 4b5d5f5..8fcb4c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -23,18 +23,20 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -56,9 +58,12 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -67,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matcher;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Assert;
@@ -82,13 +88,13 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class GroupByKeyTest {
+public class GroupByKeyTest implements Serializable {
 
   @Rule
-  public final TestPipeline p = TestPipeline.create();
+  public transient TestPipeline p = TestPipeline.create();
 
   @Rule
-  public ExpectedException thrown = ExpectedException.none();
+  public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(ValidatesRunner.class)
@@ -109,27 +115,18 @@ public class GroupByKeyTest {
     PCollection<KV<String, Iterable<Integer>>> output =
         input.apply(GroupByKey.<String, Integer>create());
 
-    PAssert.that(output)
-        .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
+    SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> checker =
+        containsKvs(
+            kv("k1", 3, 4),
+            kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE),
+            kv("k2", 66, -33),
+            kv("k3", 0));
+    PAssert.that(output).satisfies(checker);
+    PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker);
 
     p.run();
   }
 
-  static class AssertThatHasExpectedContentsForTestGroupByKey
-      implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
-                                      Void> {
-    @Override
-    public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
-      assertThat(actual, containsInAnyOrder(
-          isKv(is("k1"), containsInAnyOrder(3, 4)),
-          isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
-                                            Integer.MIN_VALUE)),
-          isKv(is("k2"), containsInAnyOrder(66, -33)),
-          isKv(is("k3"), containsInAnyOrder(0))));
-      return null;
-    }
-  }
-
   @Test
   @Category(ValidatesRunner.class)
   public void testGroupByKeyAndWindows() {
@@ -150,24 +147,115 @@ public class GroupByKeyTest {
              .apply(GroupByKey.<String, Integer>create());
 
     PAssert.that(output)
-        .satisfies(new AssertThatHasExpectedContentsForTestGroupByKeyAndWindows());
+        .satisfies(
+            containsKvs(
+                kv("k1", 3),
+                kv("k1", 4),
+                kv("k5", Integer.MAX_VALUE, Integer.MIN_VALUE),
+                kv("k2", 66),
+                kv("k2", -33),
+                kv("k3", 0)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L)))
+        .satisfies(
+            containsKvs(kv("k1", 3), kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), kv("k2", 66)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(5L), Duration.millis(5L)))
+        .satisfies(containsKvs(kv("k1", 4), kv("k2", -33), kv("k3", 0)));
+
+    p.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testGroupByKeyMultipleWindows() {
+    PCollection<KV<String, Integer>> windowedInput =
+        p.apply(
+                Create.timestamped(
+                    TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+                    TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+                    TimestampedValue.of(KV.of("bar", 3), new Instant(3))))
+            .apply(
+                Window.<KV<String, Integer>>into(
+                    SlidingWindows.of(Duration.millis(5L)).every(Duration.millis(3L))));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        windowedInput.apply(GroupByKey.<String, Integer>create());
+
+    PAssert.that(output)
+        .satisfies(
+            containsKvs(kv("foo", 1, 4), kv("foo", 1), kv("foo", 4), kv("bar", 3), kv("bar", 3)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(-3L), Duration.millis(5L)))
+        .satisfies(containsKvs(kv("foo", 1)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L)))
+        .satisfies(containsKvs(kv("foo", 1, 4), kv("bar", 3)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(3L), Duration.millis(5L)))
+        .satisfies(containsKvs(kv("foo", 4), kv("bar", 3)));
+
+    p.run();
+  }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void testGroupByKeyMergingWindows() {
+    PCollection<KV<String, Integer>> windowedInput =
+        p.apply(
+                Create.timestamped(
+                    TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+                    TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+                    TimestampedValue.of(KV.of("bar", 3), new Instant(3)),
+                    TimestampedValue.of(KV.of("foo", 9), new Instant(9))))
+            .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(4L))));
+
+    PCollection<KV<String, Iterable<Integer>>> output =
+        windowedInput.apply(GroupByKey.<String, Integer>create());
+
+    PAssert.that(output).satisfies(containsKvs(kv("foo", 1, 4), kv("foo", 9), kv("bar", 3)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(1L), new Instant(8L)))
+        .satisfies(containsKvs(kv("foo", 1, 4)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(3L), new Instant(7L)))
+        .satisfies(containsKvs(kv("bar", 3)));
+    PAssert.that(output)
+        .inWindow(new IntervalWindow(new Instant(9L), new Instant(13L)))
+        .satisfies(containsKvs(kv("foo", 9)));
 
     p.run();
   }
 
-  static class AssertThatHasExpectedContentsForTestGroupByKeyAndWindows
-      implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
-                                      Void> {
+  private static KV<String, Collection<Integer>> kv(String key, Integer... values) {
+    return KV.<String, Collection<Integer>>of(key, ImmutableList.copyOf(values));
+  }
+
+  private static SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> containsKvs(
+      KV<String, Collection<Integer>>... kvs) {
+    return new ContainsKVs(ImmutableList.copyOf(kvs));
+  }
+
+  /**
+   * A function that asserts that the input element contains the expected {@link KV KVs} in any
+   * order, where values appear in any order.
+   */
+  private static class ContainsKVs
+      implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> {
+    private final List<KV<String, Collection<Integer>>> expectedKvs;
+
+    private ContainsKVs(List<KV<String, Collection<Integer>>> expectedKvs) {
+      this.expectedKvs = expectedKvs;
+    }
+
     @Override
-      public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
-      assertThat(actual, containsInAnyOrder(
-          isKv(is("k1"), containsInAnyOrder(3)),
-          isKv(is("k1"), containsInAnyOrder(4)),
-          isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
-                                            Integer.MIN_VALUE)),
-          isKv(is("k2"), containsInAnyOrder(66)),
-          isKv(is("k2"), containsInAnyOrder(-33)),
-          isKv(is("k3"), containsInAnyOrder(0))));
+    public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
+      List<Matcher<? super KV<String, Iterable<Integer>>>> matchers = new ArrayList<>();
+      for (KV<String, Collection<Integer>> expected : expectedKvs) {
+        Integer[] values = expected.getValue().toArray(new Integer[0]);
+        matchers.add(isKv(equalTo(expected.getKey()), containsInAnyOrder(values)));
+      }
+      assertThat(input, containsInAnyOrder(matchers.toArray(new Matcher[0])));
       return null;
     }
   }