You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/09/22 15:30:41 UTC

[1/2] incubator-beam git commit: [BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 843275210 -> 6082ebcce


[BEAM-613] Revised SimpleStreamingWordCountTest to better test fixed windows.

Revised the test to test multiple batches

Set the timeout to 1 ms since it essentially plays no role here.
Removed blank lines between imports.

Refactored the timeout related stuff to make it more natural from Beam model's perspective.

Fix windowing bug.

Expected result if for the entire window.

Renamed the test's name to better reflect the use case it's testing.

Fixed a typo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1474a18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1474a18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1474a18

Branch: refs/heads/master
Commit: b1474a18c4fe3b3aefdb6cd364fce9dfc227b6df
Parents: 8432752
Author: Stas Levin <st...@gmail.com>
Authored: Mon Sep 5 18:22:59 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu Sep 22 18:18:19 2016 +0300

----------------------------------------------------------------------
 .../streaming/StreamingTransformTranslator.java | 28 +++++------
 .../streaming/SimpleStreamingWordCountTest.java | 49 +++++++++++++-------
 2 files changed, 46 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 64ddc57..9cb377d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -191,27 +191,29 @@ public final class StreamingTransformTranslator {
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<T>> dStream =
             (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
+        // get the right window durations.
+        Duration windowDuration;
+        Duration slideDuration;
         if (windowFn instanceof FixedWindows) {
-          Duration windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration));
+          windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize().getMillis());
+          slideDuration = windowDuration;
         } else if (windowFn instanceof SlidingWindows) {
-          Duration windowDuration = Durations.milliseconds(((SlidingWindows) windowFn).getSize()
-              .getMillis());
-          Duration slideDuration = Durations.milliseconds(((SlidingWindows) windowFn).getPeriod()
-              .getMillis());
-          sec.setStream(transform, dStream.window(windowDuration, slideDuration));
+          SlidingWindows slidingWindows = (SlidingWindows) windowFn;
+          windowDuration = Durations.milliseconds(slidingWindows.getSize().getMillis());
+          slideDuration = Durations.milliseconds(slidingWindows.getPeriod().getMillis());
+        } else {
+          throw new UnsupportedOperationException(String.format("WindowFn %s is not supported.",
+              windowFn.getClass().getCanonicalName()));
         }
+        JavaDStream<WindowedValue<T>> windowedDStream =
+            dStream.window(windowDuration, slideDuration);
         //--- then we apply windowing to the elements
-        @SuppressWarnings("unchecked")
-        JavaDStream<WindowedValue<T>> dStream2 =
-            (JavaDStream<WindowedValue<T>>) sec.getStream(transform);
         if (TranslationUtils.skipAssignWindows(transform, context)) {
-          sec.setStream(transform, dStream2);
+          sec.setStream(transform, windowedDStream);
         } else {
           final OldDoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           final SparkRuntimeContext runtimeContext = sec.getRuntimeContext();
-          JavaDStream<WindowedValue<T>> outStream = dStream2.transform(
+          JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform(
               new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() {
             @Override
             public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> rdd) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1474a18/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 1464273..d505878 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -17,10 +17,9 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-
+import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -45,33 +44,47 @@ import org.junit.rules.TemporaryFolder;
  */
 public class SimpleStreamingWordCountTest implements Serializable {
 
-  private static final String[] WORDS_ARRAY = {
-      "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
-  private static final List<Iterable<String>> WORDS_QUEUE =
-      Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
-  private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 2", "bob: 2"};
-
   @Rule
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
   public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
+  private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
+
+  private static final List<Iterable<String>> MANY_WORDS =
+      Lists.<Iterable<String>>newArrayList(Arrays.asList(WORDS), Arrays.asList(WORDS));
+
+  private static final String[] EXPECTED_WORD_COUNTS = {"hi: 10", "there: 2", "sue: 4", "bob: 4"};
+
+  private static final Duration BATCH_INTERVAL = Duration.standardSeconds(1);
+
+  private static final Duration windowDuration = BATCH_INTERVAL.multipliedBy(2);
+
   @Test
-  public void testRun() throws Exception {
+  public void testFixedWindows() throws Exception {
+
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
         checkpointParentDir.newFolder(getClass().getSimpleName()));
 
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> inputWords =
-        p.apply(CreateStream.fromQueue(WORDS_QUEUE)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedWords = inputWords
-        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> output = windowedWords.apply(new WordCount.CountWords())
-        .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+    // override defaults
+    options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
+    // graceful stop is on, so no worries about the timeout and window being equal
+    options.setTimeout(windowDuration.getMillis());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    PCollection<String> output =
+        pipeline
+            .apply(CreateStream.fromQueue(MANY_WORDS))
+            .setCoder(StringUtf8Coder.of())
+            .apply(Window.<String>into(FixedWindows.of(windowDuration)))
+            .apply(new WordCount.CountWords())
+            .apply(MapElements.via(new WordCount.FormatAsTextFn()));
+
+    PAssertStreaming.assertContents(output, EXPECTED_WORD_COUNTS);
 
-    PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
-    EvaluationResult res = (EvaluationResult) p.run();
+    EvaluationResult res = (EvaluationResult) pipeline.run();
     res.close();
   }
 }


[2/2] incubator-beam git commit: This closes #982

Posted by am...@apache.org.
This closes #982


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6082ebcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6082ebcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6082ebcc

Branch: refs/heads/master
Commit: 6082ebccedec076140720aefdb8f35e263847082
Parents: 8432752 b1474a1
Author: Sela <an...@paypal.com>
Authored: Thu Sep 22 18:19:43 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Thu Sep 22 18:19:43 2016 +0300

----------------------------------------------------------------------
 .../streaming/StreamingTransformTranslator.java | 28 +++++------
 .../streaming/SimpleStreamingWordCountTest.java | 49 +++++++++++++-------
 2 files changed, 46 insertions(+), 31 deletions(-)
----------------------------------------------------------------------