You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/27 04:07:18 UTC

[1/2] incubator-beam git commit: [BEAM-109] fix support for FixedWindows and SlidingWindows in batch

Repository: incubator-beam
Updated Branches:
  refs/heads/master 49d82baf1 -> 706fc5376


[BEAM-109] fix support for FixedWindows and SlidingWindows in batch

[BEAM-109] Better testing for FixedWindows and SlidingWindows

[BEAM-109] lower counts is unordered so better to compare entire result and not just iterator head


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18242651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18242651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18242651

Branch: refs/heads/master
Commit: 182426516c5184e126888d3b465dc41a1bb312aa
Parents: 49d82ba
Author: Sela <an...@paypal.com>
Authored: Sat Mar 19 12:25:02 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sat Mar 26 18:58:33 2016 -0700

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 22 ++++---
 .../translation/MultiOutputWordCountTest.java   | 19 +++++-
 .../translation/WindowedWordCountTest.java      | 63 ++++++++++++++++----
 3 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0bd047c..7f72235 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import com.google.api.client.util.Lists;
 import com.google.api.client.util.Maps;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -79,6 +80,7 @@ import org.apache.spark.api.java.JavaRDDLike;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
@@ -267,15 +269,21 @@ public final class TransformTranslator {
 
         // Key has to bw windowed in order to group by window as well
         JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair =
-            inRdd.mapToPair(
-                new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
+            inRdd.flatMapToPair(
+                new PairFlatMapFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>,
                     WindowedValue<KV<K, VI>>>() {
                   @Override
-                  public Tuple2<WindowedValue<K>,
-                      WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) {
-                    WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
-                        kv.getTimestamp(), kv.getWindows(), kv.getPane());
-                    return new Tuple2<>(wk, kv);
+                  public Iterable<Tuple2<WindowedValue<K>,
+                      WindowedValue<KV<K, VI>>>> call(WindowedValue<KV<K, VI>> kv) {
+                      List<Tuple2<WindowedValue<K>,
+                          WindowedValue<KV<K, VI>>>> tuple2s =
+                          Lists.newArrayListWithCapacity(kv.getWindows().size());
+                      for (BoundedWindow boundedWindow: kv.getWindows()) {
+                        WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(),
+                            boundedWindow.maxTimestamp(), boundedWindow, kv.getPane());
+                        tuple2s.add(new Tuple2<>(wk, kv));
+                      }
+                    return tuple2s;
                   }
                 });
         //-- windowed coders

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
index 8ab3798..974467f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java
@@ -22,14 +22,18 @@ import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.transforms.*;
 import com.google.cloud.dataflow.sdk.values.*;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Set;
+
 public class MultiOutputWordCountTest {
 
   private static final TupleTag<String> upper = new TupleTag<>();
@@ -37,6 +41,10 @@ public class MultiOutputWordCountTest {
   private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>();
   private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>();
 
+  private static final Set<String> EXPECTED_LOWER_COUNTS =
+      ImmutableSet.of("are: 2", "some: 3", "words: 3", "more: 2", "to: 1", "count: 1", "and: 2",
+      "even: 1", "others: 1");
+
   @Test
   public void testRun() throws Exception {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
@@ -53,8 +61,8 @@ public class MultiOutputWordCountTest {
         ApproximateUnique.<KV<String, Long>>globally(16));
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
-    Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts));
-    Assert.assertEquals("are", actualLower.iterator().next().getKey());
+    DataflowAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn())))
+        .containsInAnyOrder(EXPECTED_LOWER_COUNTS);
     Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts));
     Assert.assertEquals("Here", actualUpper.iterator().next().getKey());
     Iterable<Long> actualUniqCount = res.get(unique);
@@ -134,4 +142,11 @@ public class MultiOutputWordCountTest {
       return extractWordsFn.totalWords;
     }
   }
+
+  private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element().getKey() + ": " + c.element().getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index 9f29a37..9fac9c6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.collect.ImmutableList;
@@ -39,30 +40,66 @@ import org.junit.Test;
 
 public class WindowedWordCountTest {
   private static final String[] WORDS_ARRAY = {
-          "hi there", "hi", "hi sue bob",
-          "hi sue", "", "bob hi"};
+      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
   private static final Long[] TIMESTAMPS_ARRAY = {
-          60000L, 60000L, 60000L,
-          120000L, 120000L, 120000L};
+      60000L, 60000L, 60000L, 179000L, 179000L, 179000L};
   private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
   private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY);
-  private static final List<String> EXPECTED_COUNT_SET =
-          ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1",
-                  "hi: 2", "sue: 1", "bob: 1");
+
+  private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET =
+      ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1");
+
+  @Test
+  public void testFixed() throws Exception {
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PCollection<String> inputWords =
+        p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords =
+        inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+
+    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+  }
+
+  private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET =
+      ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+
+  @Test
+  public void testFixed2() throws Exception {
+    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
+        .setCoder(StringUtf8Coder.of());
+    PCollection<String> windowedWords = inputWords
+        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));
+
+    PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
+
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
+
+    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    res.close();
+  }
+
+  private static final List<String> EXPECTED_SLIDING_COUNT_SET =
+      ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2",
+      "bob: 2", "hi: 2", "sue: 1", "bob: 1");
 
   @Test
-  public void testRun() throws Exception {
-    SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
-    options.setRunner(SparkPipelineRunner.class);
+  public void testSliding() throws Exception {
     Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS))
-            .setCoder(StringUtf8Coder.of());
+        .setCoder(StringUtf8Coder.of());
     PCollection<String> windowedWords = inputWords
-            .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))));
+        .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2))
+        .every(Duration.standardMinutes(1))));
 
     PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords());
 
-    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
+    DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
 
     EvaluationResult res = SparkPipelineRunner.create().run(p);
     res.close();


[2/2] incubator-beam git commit: [BEAM-109] Combine.PerKey ignores grouping also by windows

Posted by am...@apache.org.
[BEAM-109] Combine.PerKey ignores grouping also by windows

This closes #63


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/706fc537
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/706fc537
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/706fc537

Branch: refs/heads/master
Commit: 706fc537645c64adf87f9003fc6d8e43b0ece843
Parents: 49d82ba 1824265
Author: Sela <an...@paypal.com>
Authored: Sat Mar 26 19:01:42 2016 -0700
Committer: Sela <an...@paypal.com>
Committed: Sat Mar 26 19:01:42 2016 -0700

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 22 ++++---
 .../translation/MultiOutputWordCountTest.java   | 19 +++++-
 .../translation/WindowedWordCountTest.java      | 63 ++++++++++++++++----
 3 files changed, 82 insertions(+), 22 deletions(-)
----------------------------------------------------------------------