You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:28 UTC
[32/50] [abbrv] beam git commit: Add GroupByKey tests for Multiple &
Merging windows
Add GroupByKey tests for Multiple & Merging windows
This gives explicit coverage to a GroupByKey where the elements are in
multiple windows, or in merging windows.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1e947045
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1e947045
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1e947045
Branch: refs/heads/DSL_SQL
Commit: 1e947045a54bd59b449fd56f8f5f50879b6d9c4c
Parents: be5b934
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jul 17 13:38:11 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jul 18 17:52:55 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/GroupByKeyTest.java | 156 +++++++++++++++----
1 file changed, 122 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1e947045/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 4b5d5f5..8fcb4c0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -23,18 +23,20 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -56,9 +58,12 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
@@ -67,6 +72,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matcher;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
@@ -82,13 +88,13 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
@SuppressWarnings({"rawtypes", "unchecked"})
-public class GroupByKeyTest {
+public class GroupByKeyTest implements Serializable {
@Rule
- public final TestPipeline p = TestPipeline.create();
+ public transient TestPipeline p = TestPipeline.create();
@Rule
- public ExpectedException thrown = ExpectedException.none();
+ public transient ExpectedException thrown = ExpectedException.none();
@Test
@Category(ValidatesRunner.class)
@@ -109,27 +115,18 @@ public class GroupByKeyTest {
PCollection<KV<String, Iterable<Integer>>> output =
input.apply(GroupByKey.<String, Integer>create());
- PAssert.that(output)
- .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
+ SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> checker =
+ containsKvs(
+ kv("k1", 3, 4),
+ kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE),
+ kv("k2", 66, -33),
+ kv("k3", 0));
+ PAssert.that(output).satisfies(checker);
+ PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker);
p.run();
}
- static class AssertThatHasExpectedContentsForTestGroupByKey
- implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
- Void> {
- @Override
- public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
- assertThat(actual, containsInAnyOrder(
- isKv(is("k1"), containsInAnyOrder(3, 4)),
- isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
- Integer.MIN_VALUE)),
- isKv(is("k2"), containsInAnyOrder(66, -33)),
- isKv(is("k3"), containsInAnyOrder(0))));
- return null;
- }
- }
-
@Test
@Category(ValidatesRunner.class)
public void testGroupByKeyAndWindows() {
@@ -150,24 +147,115 @@ public class GroupByKeyTest {
.apply(GroupByKey.<String, Integer>create());
PAssert.that(output)
- .satisfies(new AssertThatHasExpectedContentsForTestGroupByKeyAndWindows());
+ .satisfies(
+ containsKvs(
+ kv("k1", 3),
+ kv("k1", 4),
+ kv("k5", Integer.MAX_VALUE, Integer.MIN_VALUE),
+ kv("k2", 66),
+ kv("k2", -33),
+ kv("k3", 0)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L)))
+ .satisfies(
+ containsKvs(kv("k1", 3), kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), kv("k2", 66)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(5L), Duration.millis(5L)))
+ .satisfies(containsKvs(kv("k1", 4), kv("k2", -33), kv("k3", 0)));
+
+ p.run();
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testGroupByKeyMultipleWindows() {
+ PCollection<KV<String, Integer>> windowedInput =
+ p.apply(
+ Create.timestamped(
+ TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+ TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+ TimestampedValue.of(KV.of("bar", 3), new Instant(3))))
+ .apply(
+ Window.<KV<String, Integer>>into(
+ SlidingWindows.of(Duration.millis(5L)).every(Duration.millis(3L))));
+
+ PCollection<KV<String, Iterable<Integer>>> output =
+ windowedInput.apply(GroupByKey.<String, Integer>create());
+
+ PAssert.that(output)
+ .satisfies(
+ containsKvs(kv("foo", 1, 4), kv("foo", 1), kv("foo", 4), kv("bar", 3), kv("bar", 3)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(-3L), Duration.millis(5L)))
+ .satisfies(containsKvs(kv("foo", 1)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(0L), Duration.millis(5L)))
+ .satisfies(containsKvs(kv("foo", 1, 4), kv("bar", 3)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(3L), Duration.millis(5L)))
+ .satisfies(containsKvs(kv("foo", 4), kv("bar", 3)));
+
+ p.run();
+ }
+
+ @Test
+ @Category(ValidatesRunner.class)
+ public void testGroupByKeyMergingWindows() {
+ PCollection<KV<String, Integer>> windowedInput =
+ p.apply(
+ Create.timestamped(
+ TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+ TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+ TimestampedValue.of(KV.of("bar", 3), new Instant(3)),
+ TimestampedValue.of(KV.of("foo", 9), new Instant(9))))
+ .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(4L))));
+
+ PCollection<KV<String, Iterable<Integer>>> output =
+ windowedInput.apply(GroupByKey.<String, Integer>create());
+
+ PAssert.that(output).satisfies(containsKvs(kv("foo", 1, 4), kv("foo", 9), kv("bar", 3)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(1L), new Instant(8L)))
+ .satisfies(containsKvs(kv("foo", 1, 4)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(3L), new Instant(7L)))
+ .satisfies(containsKvs(kv("bar", 3)));
+ PAssert.that(output)
+ .inWindow(new IntervalWindow(new Instant(9L), new Instant(13L)))
+ .satisfies(containsKvs(kv("foo", 9)));
p.run();
}
- static class AssertThatHasExpectedContentsForTestGroupByKeyAndWindows
- implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
- Void> {
+ private static KV<String, Collection<Integer>> kv(String key, Integer... values) {
+ return KV.<String, Collection<Integer>>of(key, ImmutableList.copyOf(values));
+ }
+
+ private static SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> containsKvs(
+ KV<String, Collection<Integer>>... kvs) {
+ return new ContainsKVs(ImmutableList.copyOf(kvs));
+ }
+
+ /**
+ * A function that asserts that the input element contains the expected {@link KV KVs} in any
+ * order, where values appear in any order.
+ */
+ private static class ContainsKVs
+ implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void> {
+ private final List<KV<String, Collection<Integer>>> expectedKvs;
+
+ private ContainsKVs(List<KV<String, Collection<Integer>>> expectedKvs) {
+ this.expectedKvs = expectedKvs;
+ }
+
@Override
- public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
- assertThat(actual, containsInAnyOrder(
- isKv(is("k1"), containsInAnyOrder(3)),
- isKv(is("k1"), containsInAnyOrder(4)),
- isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
- Integer.MIN_VALUE)),
- isKv(is("k2"), containsInAnyOrder(66)),
- isKv(is("k2"), containsInAnyOrder(-33)),
- isKv(is("k3"), containsInAnyOrder(0))));
+ public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
+ List<Matcher<? super KV<String, Iterable<Integer>>>> matchers = new ArrayList<>();
+ for (KV<String, Collection<Integer>> expected : expectedKvs) {
+ Integer[] values = expected.getValue().toArray(new Integer[0]);
+ matchers.add(isKv(equalTo(expected.getKey()), containsInAnyOrder(values)));
+ }
+ assertThat(input, containsInAnyOrder(matchers.toArray(new Matcher[0])));
return null;
}
}