You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/21 01:41:48 UTC

[1/3] incubator-beam git commit: Unabbreviate some variable names

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4f8cb0344 -> 506023bf5


Unabbreviate some variable names


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c9dd225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c9dd225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c9dd225

Branch: refs/heads/master
Commit: 4c9dd22501ff931e19f41661018795d328b31295
Parents: fdec569
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 12:58:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 12:58:08 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 92 ++++++++++----------
 1 file changed, 46 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4c9dd225/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index b710641..38961ed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -124,8 +124,8 @@ public class CombineTest implements Serializable {
   private void runTestSimpleCombine(KV<String, Integer>[] table,
                                     int globalSum,
                                     KV<String, String>[] perKeyCombines) {
-    Pipeline p = TestPipeline.create();
-    PCollection<KV<String, Integer>> input = createInput(p, table);
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Integer> sum = input
         .apply(Values.<Integer>create())
@@ -138,15 +138,15 @@ public class CombineTest implements Serializable {
     PAssert.that(sum).containsInAnyOrder(globalSum);
     PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines);
 
-    p.run();
+    pipeline.run();
   }
 
   private void runTestSimpleCombineWithContext(KV<String, Integer>[] table,
                                                int globalSum,
                                                KV<String, String>[] perKeyCombines,
                                                String[] globallyCombines) {
-    Pipeline p = TestPipeline.create();
-    PCollection<KV<String, Integer>> perKeyInput = createInput(p, table);
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, table);
     PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create());
 
     PCollection<Integer> sum = globallyInput.apply("Sum", Combine.globally(new SumInts()));
@@ -168,7 +168,7 @@ public class CombineTest implements Serializable {
     PAssert.that(combinePerKey).containsInAnyOrder(perKeyCombines);
     PAssert.that(combineGlobally).containsInAnyOrder(globallyCombines);
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -206,9 +206,9 @@ public class CombineTest implements Serializable {
   private void runTestBasicCombine(KV<String, Integer>[] table,
                                    Set<Integer> globalUnique,
                                    KV<String, Set<Integer>>[] perKeyUnique) {
-    Pipeline p = TestPipeline.create();
-    p.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
-    PCollection<KV<String, Integer>> input = createInput(p, table);
+    Pipeline pipeline = TestPipeline.create();
+    pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
+    PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Set<Integer>> unique = input
         .apply(Values.<Integer>create())
@@ -221,7 +221,7 @@ public class CombineTest implements Serializable {
     PAssert.that(unique).containsInAnyOrder(globalUnique);
     PAssert.that(uniquePerKey).containsInAnyOrder(perKeyUnique);
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -243,8 +243,8 @@ public class CombineTest implements Serializable {
   private void runTestAccumulatingCombine(KV<String, Integer>[] table,
                                           Double globalMean,
                                           KV<String, Double>[] perKeyMeans) {
-    Pipeline p = TestPipeline.create();
-    PCollection<KV<String, Integer>> input = createInput(p, table);
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Double> mean = input
         .apply(Values.<Integer>create())
@@ -257,16 +257,16 @@ public class CombineTest implements Serializable {
     PAssert.that(mean).containsInAnyOrder(globalMean);
     PAssert.that(meanPerKey).containsInAnyOrder(perKeyMeans);
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testFixedWindowsCombine() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
-        p.apply(Create.timestamped(Arrays.asList(TABLE),
+        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
                                    Arrays.asList(0L, 1L, 6L, 7L, 8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
@@ -284,16 +284,16 @@ public class CombineTest implements Serializable {
         KV.of("a", "4a"),
         KV.of("b", "1b"),
         KV.of("b", "13b"));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testFixedWindowsCombineWithContext() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        p.apply(Create.timestamped(Arrays.asList(TABLE),
+        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
                                    Arrays.asList(0L, 1L, 6L, 7L, 8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.millis(2))));
@@ -322,16 +322,16 @@ public class CombineTest implements Serializable {
         KV.of("b", "15b"),
         KV.of("b", "1133b"));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder("112G", "145G", "1133G");
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSlidingWindowsCombineWithContext() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        p.apply(Create.timestamped(Arrays.asList(TABLE),
+        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
                                    Arrays.asList(2L, 3L, 8L, 9L, 10L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, Integer>>into(SlidingWindows.of(Duration.millis(2))));
@@ -365,7 +365,7 @@ public class CombineTest implements Serializable {
         KV.of("b", "1133b"));
     PAssert.that(combineGloballyWithContext).containsInAnyOrder(
       "11G", "112G", "11G", "44G", "145G", "11134G", "1133G");
-    p.run();
+    pipeline.run();
   }
 
   private static class FormatPaneInfo extends DoFn<Integer, String> {
@@ -378,8 +378,8 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testGlobalCombineWithDefaultsAndTriggers() {
-    Pipeline p = TestPipeline.create();
-    PCollection<Integer> input = p.apply(Create.of(1, 1));
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
 
     PCollection<String> output = input
         .apply(Window.<Integer>into(new GlobalWindows())
@@ -395,10 +395,10 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testSessionsCombine() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> input =
-        p.apply(Create.timestamped(Arrays.asList(TABLE),
+        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
                                    Arrays.asList(0L, 4L, 7L, 10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
          .apply(Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5))));
@@ -415,16 +415,16 @@ public class CombineTest implements Serializable {
         KV.of("a", "114a"),
         KV.of("b", "1b"),
         KV.of("b", "13b"));
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testSessionsCombineWithContext() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
     PCollection<KV<String, Integer>> perKeyInput =
-        p.apply(Create.timestamped(Arrays.asList(TABLE),
+        pipeline.apply(Create.timestamped(Arrays.asList(TABLE),
                                    Arrays.asList(0L, 4L, 7L, 10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
 
@@ -458,22 +458,22 @@ public class CombineTest implements Serializable {
         KV.of("b", "11b"),
         KV.of("b", "013b"));
     PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114G", "013G");
-    p.run();
+    pipeline.run();
   }
 
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedCombineEmpty() {
-    Pipeline p = TestPipeline.create();
+    Pipeline pipeline = TestPipeline.create();
 
-    PCollection<Double> mean = p
+    PCollection<Double> mean = pipeline
         .apply(Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
         .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(1))))
         .apply(Combine.globally(new MeanInts()).withoutDefaults());
 
     PAssert.that(mean).empty();
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -541,8 +541,8 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testHotKeyCombining() {
-    Pipeline p = TestPipeline.create();
-    PCollection<KV<String, Integer>> input = copy(createInput(p, TABLE), 10);
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10);
 
     KeyedCombineFn<String, Integer, ?, Double> mean =
         new MeanInts().<String>asKeyedFn();
@@ -561,7 +561,7 @@ public class CombineTest implements Serializable {
     PAssert.that(hotMean).containsInAnyOrder(expected);
     PAssert.that(splitMean).containsInAnyOrder(expected);
 
-    p.run();
+    pipeline.run();
   }
 
   private static class GetLast extends DoFn<Integer, Integer> {
@@ -576,8 +576,8 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testHotKeyCombiningWithAccumulationMode() {
-    Pipeline p = TestPipeline.create();
-    PCollection<Integer> input = p.apply(Create.of(1, 2, 3, 4, 5));
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 
     PCollection<Integer> output = input
         .apply(Window.<Integer>into(new GlobalWindows())
@@ -589,13 +589,13 @@ public class CombineTest implements Serializable {
 
     PAssert.that(output).containsInAnyOrder(15);
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
   public void testBinaryCombineFn() {
-    Pipeline p = TestPipeline.create();
-    PCollection<KV<String, Integer>> input = copy(createInput(p, TABLE), 2);
+    Pipeline pipeline = TestPipeline.create();
+    PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2);
     PCollection<KV<String, Integer>> intProduct = input
         .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new TestProdInt()));
     PCollection<KV<String, Integer>> objProduct = input
@@ -605,7 +605,7 @@ public class CombineTest implements Serializable {
     PAssert.that(intProduct).containsInAnyOrder(expected);
     PAssert.that(objProduct).containsInAnyOrder(expected);
 
-    p.run();
+    pipeline.run();
   }
 
   @Test
@@ -649,12 +649,12 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testCombineGloballyAsSingletonView() {
-    Pipeline p = TestPipeline.create();
-    final PCollectionView<Integer> view = p
+    Pipeline pipeline = TestPipeline.create();
+    final PCollectionView<Integer> view = pipeline
         .apply("CreateEmptySideInput", Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
         .apply(Sum.integersGlobally().asSingletonView());
 
-    PCollection<Integer> output = p
+    PCollection<Integer> output = pipeline
         .apply("CreateVoidMainInput", Create.of((Void) null))
         .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
                   @Override
@@ -664,7 +664,7 @@ public class CombineTest implements Serializable {
                 }).withSideInputs(view));
 
     PAssert.thatSingleton(output).isEqualTo(0);
-    p.run();
+    pipeline.run();
   }
 
   @Test


[3/3] incubator-beam git commit: This closes #218

Posted by ke...@apache.org.
This closes #218


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/506023bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/506023bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/506023bf

Branch: refs/heads/master
Commit: 506023bf53a78ba38c87dd3bddcdc069461f22ea
Parents: 4f8cb03 aecf706
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 16:41:30 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 16:41:30 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 106 +++++++++++--------
 1 file changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Fix erroneous test in CombineTest

Posted by ke...@apache.org.
Fix erroneous test in CombineTest

The test modified was never run, so we never learned that it was
incorrect. This change activates the test and fixes it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aecf7069
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aecf7069
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aecf7069

Branch: refs/heads/master
Commit: aecf7069a1a38845a883a226a1547320e5ae51af
Parents: 4c9dd22
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Apr 20 13:12:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Apr 20 16:41:14 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/CombineTest.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aecf7069/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 38961ed..9fa148e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.transforms;
 import static org.apache.beam.sdk.TestUtils.checkCombineFn;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
@@ -389,7 +391,17 @@ public class CombineTest implements Serializable {
         .apply(Sum.integersGlobally())
         .apply(ParDo.of(new FormatPaneInfo()));
 
-    PAssert.that(output).containsInAnyOrder("1: false", "2: true");
+    // The actual elements produced are nondeterministic. Could be one, could be two.
+    // But it should certainly have a final element with the correct final sum.
+    PAssert.that(output).satisfies(new SerializableFunction<Iterable<String>, Void>() {
+      @Override
+      public Void apply(Iterable<String> input) {
+        assertThat(input, hasItem("2: true"));
+        return null;
+      }
+    });
+
+    pipeline.run();
   }
 
   @Test