You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/19 17:23:42 UTC

[1/2] beam git commit: A few cleanups in CombineTest

Repository: beam
Updated Branches:
  refs/heads/master c12d6ba80 -> 1fb430442


A few cleanups in CombineTest

Better error messages and IntelliJ warning cleanups.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf654a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf654a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf654a0b

Branch: refs/heads/master
Commit: cf654a0bcd876310311f48deb64cd49d7df2893c
Parents: c12d6ba
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jun 16 13:07:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 19 10:23:20 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 125 ++++++++-----------
 1 file changed, 53 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cf654a0b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index c4ba62d..e2469ab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -85,7 +84,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mock;
 
 /**
  * Tests for Combine transforms.
@@ -97,8 +95,6 @@ public class CombineTest implements Serializable {
 
   static final List<KV<String, Integer>> EMPTY_TABLE = Collections.emptyList();
 
-  @Mock private DoFn<?, ?>.ProcessContext processContext;
-
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
 
@@ -142,12 +138,12 @@ public class CombineTest implements Serializable {
     PCollection<KV<String, String>> combinePerKey =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
-                .withSideInputs(Arrays.asList(globallySumView)));
+                .withSideInputs(globallySumView));
 
     PCollection<String> combineGlobally = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(Arrays.asList(globallySumView)));
+            .withSideInputs(globallySumView));
 
     PAssert.that(sum).containsInAnyOrder(globalSum);
     PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
@@ -280,11 +276,9 @@ public class CombineTest implements Serializable {
         .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
-    PAssert.that(sumPerKey).containsInAnyOrder(
-        KV.of("a", "11"),
-        KV.of("a", "4"),
-        KV.of("b", "1"),
-        KV.of("b", "13"));
+    PAssert.that(sumPerKey)
+        .containsInAnyOrder(
+            Arrays.asList(KV.of("a", "11"), KV.of("a", "4"), KV.of("b", "1"), KV.of("b", "13")));
     pipeline.run();
   }
 
@@ -313,19 +307,18 @@ public class CombineTest implements Serializable {
     PCollection<KV<String, String>> combinePerKeyWithContext =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
-                .withSideInputs(Arrays.asList(globallySumView)));
+                .withSideInputs(globallySumView));
 
     PCollection<String> combineGloballyWithContext = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(Arrays.asList(globallySumView)));
+            .withSideInputs(globallySumView));
 
     PAssert.that(sum).containsInAnyOrder(2, 5, 13);
-    PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "2:11"),
-        KV.of("a", "5:4"),
-        KV.of("b", "5:1"),
-        KV.of("b", "13:13"));
+    PAssert.that(combinePerKeyWithContext)
+        .containsInAnyOrder(
+            Arrays.asList(
+                KV.of("a", "2:11"), KV.of("a", "5:4"), KV.of("b", "5:1"), KV.of("b", "13:13")));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder("2:11", "5:14", "13:13");
     pipeline.run();
   }
@@ -355,23 +348,25 @@ public class CombineTest implements Serializable {
     PCollection<KV<String, String>> combinePerKeyWithContext =
         perKeyInput.apply(
             Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView))
-                .withSideInputs(Arrays.asList(globallySumView)));
+                .withSideInputs(globallySumView));
 
     PCollection<String> combineGloballyWithContext = globallyInput
         .apply(Combine.globally(new TestCombineFnWithContext(globallySumView))
             .withoutDefaults()
-            .withSideInputs(Arrays.asList(globallySumView)));
+            .withSideInputs(globallySumView));
 
     PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13);
-    PAssert.that(combinePerKeyWithContext).containsInAnyOrder(
-        KV.of("a", "1:1"),
-        KV.of("a", "2:11"),
-        KV.of("a", "1:1"),
-        KV.of("a", "4:4"),
-        KV.of("a", "5:4"),
-        KV.of("b", "5:1"),
-        KV.of("b", "14:113"),
-        KV.of("b", "13:13"));
+    PAssert.that(combinePerKeyWithContext)
+        .containsInAnyOrder(
+            Arrays.asList(
+                KV.of("a", "1:1"),
+                KV.of("a", "2:11"),
+                KV.of("a", "1:1"),
+                KV.of("a", "4:4"),
+                KV.of("a", "5:4"),
+                KV.of("b", "5:1"),
+                KV.of("b", "14:113"),
+                KV.of("b", "13:13")));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder(
       "1:1", "2:11", "1:1", "4:4", "5:14", "14:113", "13:13");
     pipeline.run();
@@ -433,10 +428,8 @@ public class CombineTest implements Serializable {
         .apply(Combine.<String, Integer, String>perKey(new TestCombineFn()));
 
     PAssert.that(sum).containsInAnyOrder(7, 13);
-    PAssert.that(sumPerKey).containsInAnyOrder(
-        KV.of("a", "114"),
-        KV.of("b", "1"),
-        KV.of("b", "13"));
+    PAssert.that(sumPerKey)
+        .containsInAnyOrder(Arrays.asList(KV.of("a", "114"), KV.of("b", "1"), KV.of("b", "13")));
     pipeline.run();
   }
 
@@ -471,7 +464,7 @@ public class CombineTest implements Serializable {
             .apply(
                 Combine.<String, Integer, String>perKey(
                         new TestCombineFnWithContext(globallyFixedWindowsView))
-                    .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+                    .withSideInputs(globallyFixedWindowsView));
 
     PCollection<String> sessionsCombineGlobally =
         globallyInput
@@ -481,13 +474,12 @@ public class CombineTest implements Serializable {
             .apply(
                 Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView))
                     .withoutDefaults()
-                    .withSideInputs(Arrays.asList(globallyFixedWindowsView)));
+                    .withSideInputs(globallyFixedWindowsView));
 
     PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13);
-    PAssert.that(sessionsCombinePerKey).containsInAnyOrder(
-        KV.of("a", "1:114"),
-        KV.of("b", "1:1"),
-        KV.of("b", "0:13"));
+    PAssert.that(sessionsCombinePerKey)
+        .containsInAnyOrder(
+            Arrays.asList(KV.of("a", "1:114"), KV.of("b", "1:1"), KV.of("b", "0:13")));
     PAssert.that(sessionsCombineGlobally).containsInAnyOrder("1:1114", "0:13");
     pipeline.run();
   }
@@ -716,7 +708,7 @@ public class CombineTest implements Serializable {
         pipeline
             .apply(
                 "CreateMainInput",
-                Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
+                Create.timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of()))
             .apply("WindowMainInput", Window.<Void>into(windowFn))
             .apply(
                 "OutputSideInput",
@@ -941,15 +933,13 @@ public class CombineTest implements Serializable {
      */
     private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
-      public void encode(CountSum value, OutputStream outStream)
-          throws CoderException, IOException {
+      public void encode(CountSum value, OutputStream outStream) throws IOException {
         LONG_CODER.encode(value.count, outStream);
         DOUBLE_CODER.encode(value.sum, outStream);
       }
 
       @Override
-      public CountSum decode(InputStream inStream)
-          throws CoderException, IOException {
+      public CountSum decode(InputStream inStream) throws IOException {
         long count = LONG_CODER.decode(inStream);
         double sum = DOUBLE_CODER.decode(inStream);
         return new CountSum(count, sum);
@@ -992,28 +982,15 @@ public class CombineTest implements Serializable {
       public static Coder<Accumulator> getCoder() {
         return new AtomicCoder<Accumulator>() {
           @Override
-          public void encode(Accumulator accumulator, OutputStream outStream)
-              throws CoderException, IOException {
-            encode(accumulator, outStream, Coder.Context.NESTED);
+          public void encode(Accumulator accumulator, OutputStream outStream) throws IOException {
+            StringUtf8Coder.of().encode(accumulator.seed, outStream);
+            StringUtf8Coder.of().encode(accumulator.value, outStream);
           }
 
           @Override
-          public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
-              throws CoderException, IOException {
-            StringUtf8Coder.of().encode(accumulator.seed, outStream, context.nested());
-            StringUtf8Coder.of().encode(accumulator.value, outStream, context);
-          }
-
-          @Override
-          public Accumulator decode(InputStream inStream) throws CoderException, IOException {
-            return decode(inStream, Coder.Context.NESTED);
-          }
-
-          @Override
-          public Accumulator decode(InputStream inStream, Coder.Context context)
-              throws CoderException, IOException {
-            String seed = StringUtf8Coder.of().decode(inStream, context.nested());
-            String value = StringUtf8Coder.of().decode(inStream, context);
+          public Accumulator decode(InputStream inStream) throws IOException {
+            String seed = StringUtf8Coder.of().decode(inStream);
+            String value = StringUtf8Coder.of().decode(inStream);
             return new Accumulator(seed, value);
           }
         };
@@ -1042,18 +1019,22 @@ public class CombineTest implements Serializable {
 
     @Override
     public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) {
-      String seed = null;
-      String all = "";
+      Accumulator seedAccumulator = null;
+      StringBuilder all = new StringBuilder();
       for (Accumulator accumulator : accumulators) {
-        if (seed == null) {
-          seed = accumulator.seed;
+        if (seedAccumulator == null) {
+          seedAccumulator = accumulator;
         } else {
-          checkArgument(seed.equals(accumulator.seed), "Different seed values in accumulator");
+          assertEquals(
+              String.format(
+                  "Different seed values in accumulator: %s vs. %s", seedAccumulator, accumulator),
+              seedAccumulator.seed,
+              accumulator.seed);
         }
-        all += accumulator.value;
+        all.append(accumulator.value);
         accumulator.value = "cleared in mergeAccumulators";
       }
-      return new Accumulator(seed, all);
+      return new Accumulator(checkNotNull(seedAccumulator).seed, all.toString());
     }
 
     @Override
@@ -1161,7 +1142,7 @@ public class CombineTest implements Serializable {
       @Override
       public void mergeAccumulator(Counter accumulator) {
         checkState(outputs == 0);
-        checkArgument(accumulator.outputs == 0);
+        assertEquals(0, accumulator.outputs);
 
         merges += accumulator.merges + 1;
         inputs += accumulator.inputs;


[2/2] beam git commit: This closes #3380: A few cleanups in CombineTest

Posted by jk...@apache.org.
This closes #3380: A few cleanups in CombineTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1fb43044
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1fb43044
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1fb43044

Branch: refs/heads/master
Commit: 1fb43044213fa3381c8534da2bc604b72d355805
Parents: c12d6ba cf654a0
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jun 19 10:23:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 19 10:23:25 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 125 ++++++++-----------
 1 file changed, 53 insertions(+), 72 deletions(-)
----------------------------------------------------------------------