You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/12 22:48:50 UTC

[2/2] beam git commit: Cleanup Combine Tests with Context

Cleanup Combine Tests with Context

Split out the "shared" bit of all the accumulators, so they show up as
an explicit component of the final result string. Update timestamped
creation logic.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f0f98c70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f0f98c70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f0f98c70

Branch: refs/heads/master
Commit: f0f98c70b0b01c43ce1b093c2035b20bd90ba907
Parents: 86e0489
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jun 12 11:07:47 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Jun 12 15:48:38 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 225 +++++++++++++------
 1 file changed, 154 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f0f98c70/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index dc9788f..6a4348d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -95,14 +95,6 @@ public class CombineTest implements Serializable {
   // This test is Serializable, just so that it's easy to have
   // anonymous inner classes inside the non-static test methods.
 
-  static final List<KV<String, Integer>> TABLE = Arrays.asList(
-    KV.of("a", 1),
-    KV.of("a", 1),
-    KV.of("a", 4),
-    KV.of("b", 1),
-    KV.of("b", 13)
-  );
-
   static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
 
   @Mock private DoFn<?, ?>.ProcessContext processContext;
@@ -168,16 +160,28 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombine() {
-    runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
+    runTestSimpleCombine(Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    ), 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113")));
   }
 
   @Test
   @Category(ValidatesRunner.class)
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testSimpleCombineWithContext() {
-    runTestSimpleCombineWithContext(TABLE, 20,
-        Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")),
-        new String[] {"01111234"});
+    runTestSimpleCombineWithContext(Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    ), 20,
+        Arrays.asList(KV.of("a", "20:114"), KV.of("b", "20:113")),
+        new String[] {"20:111134"});
   }
 
   @Test
@@ -216,7 +220,13 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testBasicCombine() {
-    runTestBasicCombine(TABLE, ImmutableSet.of(1, 13, 4), Arrays.asList(
+    runTestBasicCombine(Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    ), ImmutableSet.of(1, 13, 4), Arrays.asList(
         KV.of("a", (Set<Integer>) ImmutableSet.of(1, 4)),
         KV.of("b", (Set<Integer>) ImmutableSet.of(1, 13))));
   }
@@ -251,9 +261,16 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testFixedWindowsCombine() {
     PCollection<KV<String, Integer>> input =
-        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L))
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-         .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
+                        TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
+                        TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
+                        TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
+                        TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
+                    .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+            .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
 
     PCollection<Integer> sum = input
         .apply(Values.<Integer>create())
@@ -275,9 +292,16 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testFixedWindowsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L))
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-         .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
+                        TimestampedValue.of(KV.of("a", 1), new Instant(1L)),
+                        TimestampedValue.of(KV.of("a", 4), new Instant(6L)),
+                        TimestampedValue.of(KV.of("b", 1), new Instant(7L)),
+                        TimestampedValue.of(KV.of("b", 13), new Instant(8L)))
+                    .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+            .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
 
     PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
 
@@ -298,26 +322,33 @@ public class CombineTest implements Serializable {
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
     PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "112"),
-        KV.of("a", "45"),
-        KV.of("b", "15"),
-        KV.of("b", "1133"));
-    PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", "1133");
+        KV.of("a", "2:11"),
+        KV.of("a", "5:4"),
+        KV.of("b", "5:1"),
+        KV.of("b", "13:13"));
+    PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13");
     pipeline.run();
   }
 
   @Test
   @Category(ValidatesRunner.class)
   public void testSlidingWindowsCombineWithContext() {
+    // [a: 1, 1], [a: 4; b: 1], [b: 13]
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L))
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-         .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2))));
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of(KV.of("a", 1), new Instant(2L)),
+                        TimestampedValue.of(KV.of("a", 1), new Instant(3L)),
+                        TimestampedValue.of(KV.of("a", 4), new Instant(8L)),
+                        TimestampedValue.of(KV.of("b", 1), new Instant(9L)),
+                        TimestampedValue.of(KV.of("b", 13), new Instant(10L)))
+                    .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+            .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2))));
 
     PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
 
-    PCollection<Integer> sum = globallyInput
-        .apply("Sum", Combine.globally(new SumInts()).withoutDefaults());
+    PCollection<Integer> sum = globallyInput.apply("Sum", Sum.integersGlobally().withoutDefaults());
 
     PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton());
 
@@ -333,16 +364,16 @@ public class CombineTest implements Serializable {
 
     PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
     PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "11"),
-        KV.of("a", "112"),
-        KV.of("a", "11"),
-        KV.of("a", "44"),
-        KV.of("a", "45"),
-        KV.of("b", "15"),
-        KV.of("b", "11134"),
-        KV.of("b", "1133"));
+        KV.of("a", "1:1"),
+        KV.of("a", "2:11"),
+        KV.of("a", "1:1"),
+        KV.of("a", "4:4"),
+        KV.of("a", "5:4"),
+        KV.of("b", "5:1"),
+        KV.of("b", "14:113"),
+        KV.of("b", "13:13"));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder(
-      "11", "112", "11", "44", "145", "11134", "1133");
+      "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13");
     pipeline.run();
   }
 
@@ -383,9 +414,16 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testSessionsCombine() {
     PCollection<KV<String, Integer>> input =
-        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L))
-                .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
-         .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
+                        TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
+                        TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
+                        TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
+                        TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
+                    .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+            .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
 
     PCollection<Integer> sum = input
         .apply(Values.<Integer>create())
@@ -406,7 +444,13 @@ public class CombineTest implements Serializable {
   @Category(ValidatesRunner.class)
   public void testSessionsCombineWithContext() {
     PCollection<KV<String, Integer>> perKeyInput =
-        pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L))
+        pipeline.apply(
+            Create.timestamped(
+                    TimestampedValue.of(KV.of("a", 1), new Instant(0L)),
+                    TimestampedValue.of(KV.of("a", 1), new Instant(4L)),
+                    TimestampedValue.of(KV.of("a", 4), new Instant(7L)),
+                    TimestampedValue.of(KV.of("b", 1), new Instant(10L)),
+                    TimestampedValue.of(KV.of("b", 13), new Instant(16L)))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
 
     PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
@@ -429,19 +473,22 @@ public class CombineTest implements Serializable {
                         new TestCombineFnWithContext(globallyFixedWindowsView))
                     .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
-    PCollection<String> sessionsCombineGlobally = globallyInput
-        .apply("Globally Input Sessions",
-            Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
-        .apply(Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
-            .withoutDefaults()
-            .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+    PCollection<String> sessionsCombineGlobally =
+        globallyInput
+            .apply(
+                "Globally Input Sessions",
+                Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5))))
+            .apply(
+                Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
+                    .withoutDefaults()
+                    .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
 
     PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
     PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
-        KV.of("a", "1114"),
-        KV.of("b", "11"),
-        KV.of("b", "013"));
-    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013");
+        KV.of("a", "1:114"),
+        KV.of("b", "1:1"),
+        KV.of("b", "0:13"));
+    PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13");
     pipeline.run();
   }
 
@@ -461,7 +508,13 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testAccumulatingCombine() {
-    runTestAccumulatingCombine(TABLE, 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)));
+    runTestAccumulatingCombine(Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    ), 4.0, Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)));
   }
 
   @Test
@@ -503,7 +556,13 @@ public class CombineTest implements Serializable {
   @Test
   @Category(ValidatesRunner.class)
   public void testHotKeyCombining() {
-    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    )), 10);
 
     CombineFn<Integer, ?, Double> mean = new MeanInts();
     PCollection<KV<String, Double>> coldMean = input.apply("ColdMean",
@@ -560,7 +619,13 @@ public class CombineTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testBinaryCombineFn() {
-    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2);
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, Arrays.asList(
+      KV.of("a", 1),
+      KV.of("a", 1),
+      KV.of("a", 4),
+      KV.of("b", 1),
+      KV.of("b", 13)
+    )), 2);
     PCollection<KV<String, Integer>> intProduct = input
         .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new TestProdInt()));
     PCollection<KV<String, Integer>> objProduct = input
@@ -917,8 +982,10 @@ public class CombineTest implements Serializable {
 
     // Not serializable.
     static class Accumulator {
+      final String seed;
       String value;
-      public Accumulator(String value) {
+      public Accumulator(String seed, String value) {
+        this.seed = seed;
         this.value = value;
       }
 
@@ -933,6 +1000,7 @@ public class CombineTest implements Serializable {
           @Override
           public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
               throws CoderException, IOException {
+            StringUtf8Coder.of().encode(accumulator.seed, outStream, context);
             StringUtf8Coder.of().encode(accumulator.value, outStream, context);
           }
 
@@ -944,7 +1012,9 @@ public class CombineTest implements Serializable {
           @Override
           public Accumulator decode(InputStream inStream, Coder.Context context)
               throws CoderException, IOException {
-            return new Accumulator(StringUtf8Coder.of().decode(inStream, context));
+            return new Accumulator(
+                StringUtf8Coder.of().decode(inStream, context),
+                StringUtf8Coder.of().decode(inStream, context));
           }
         };
       }
@@ -958,13 +1028,13 @@ public class CombineTest implements Serializable {
 
     @Override
     public Accumulator createAccumulator() {
-      return new Accumulator("");
+      return new Accumulator("", "");
     }
 
     @Override
     public Accumulator addInput(Accumulator accumulator, Integer value) {
       try {
-        return new Accumulator(accumulator.value + String.valueOf(value));
+        return new Accumulator(accumulator.seed, accumulator.value + String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
       }
@@ -972,12 +1042,18 @@ public class CombineTest implements Serializable {
 
     @Override
     public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
+      String seed = null;
       String all = "";
       for (Accumulator accumulator : accumulators) {
+        if (seed == null) {
+          seed = accumulator.seed;
+        } else {
+          checkArgument(seed.equals(accumulator.seed), "Different seed values in accumulator");
+        }
         all += accumulator.value;
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new Accumulator(all);
+      return new Accumulator(seed, all);
     }
 
     @Override
@@ -1007,40 +1083,47 @@ public class CombineTest implements Serializable {
 
     @Override
     public TestCombineFn.Accumulator createAccumulator(Context c) {
-      return new TestCombineFn.Accumulator(c.sideInput(view).toString());
+      Integer sideInputValue = c.sideInput(view);
+      return new TestCombineFn.Accumulator(sideInputValue.toString(), "");
     }
 
     @Override
     public TestCombineFn.Accumulator addInput(
         TestCombineFn.Accumulator accumulator, Integer value, Context c) {
       try {
-        assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
-        return new TestCombineFn.Accumulator(accumulator.value + String.valueOf(value));
+        assertThat(
+            "Not expecting view contents to change",
+            accumulator.seed,
+            Matchers.equalTo(Integer.toString(c.sideInput(view))));
+        return new TestCombineFn.Accumulator(
+            accumulator.seed, accumulator.value + String.valueOf(value));
       } finally {
         accumulator.value = "cleared in addInput";
       }
-
     }
 
     @Override
     public TestCombineFn.Accumulator mergeAccumulators(
         Iterable<TestCombineFn.Accumulator> accumulators, Context c) {
-      String prefix = c.sideInput(view).toString();
-      String all = prefix;
+      String sideInputValue = c.sideInput(view).toString();
+      StringBuilder all = new StringBuilder();
       for (TestCombineFn.Accumulator accumulator : accumulators) {
-        assertThat(accumulator.value, Matchers.startsWith(prefix));
-        all += accumulator.value.substring(prefix.length());
+        assertThat(
+            "Accumulators should all have the same Side Input Value",
+            accumulator.seed,
+            Matchers.equalTo(sideInputValue));
+        all.append(accumulator.value);
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new TestCombineFn.Accumulator(all);
+      return new TestCombineFn.Accumulator(sideInputValue, all.toString());
     }
 
     @Override
     public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) {
-      assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString()));
+      assertThat(accumulator.seed, Matchers.startsWith(c.sideInput(view).toString()));
       char[] chars = accumulator.value.toCharArray();
       Arrays.sort(chars);
-      return new String(chars);
+      return accumulator.seed + ":" + new String(chars);
     }
   }