You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:43:47 UTC

[09/50] [abbrv] incubator-beam git commit: [BEAM-769] Spark: Make graceful stop the default.

[BEAM-769] Spark: Make graceful stop the default.

streaming tests fail on "nothing processed" if runtime env. is slow because timeout
is hit before processing is done.

Keep "pumping-in" the last batch in a mocked stream to handle overflowing batches in case of a
graceful stop.

Change tests accordingly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43228c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43228c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43228c9

Branch: refs/heads/gearpump-runner
Commit: e43228c92cd9dd8a81a28940d419b721a2aeb2d8
Parents: a9a41eb
Author: Sela <an...@paypal.com>
Authored: Fri Oct 21 01:20:33 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Oct 24 07:07:27 2016 -0700

----------------------------------------------------------------------
 .../streaming/StreamingEvaluationContext.java            | 11 ++++++++---
 .../translation/streaming/EmptyStreamAssertionTest.java  | 10 +++++++---
 .../streaming/ResumeFromCheckpointStreamingTest.java     |  2 +-
 .../streaming/SimpleStreamingWordCountTest.java          |  1 -
 .../translation/streaming/utils/PAssertStreaming.java    |  6 +++---
 5 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 2652f2b..49afa26 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -92,6 +92,7 @@ public class StreamingEvaluationContext extends EvaluationContext {
             WindowedValue.getValueOnlyCoder(coder);
         // create the DStream from queue
         Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+        JavaRDD<WindowedValue<T>> lastRDD = null;
         for (Iterable<T> v : values) {
           Iterable<WindowedValue<T>> windowedValues =
               Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
@@ -99,10 +100,14 @@ public class StreamingEvaluationContext extends EvaluationContext {
               CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
                   CoderHelpers.fromByteFunction(windowCoder));
           rddQueue.offer(rdd);
+          lastRDD = rdd;
         }
-        // create dstream from queue, one at a time, no defaults
-        // mainly for unit test so no reason to have this configurable
-        dStream = jssc.queueStream(rddQueue, true);
+        // create dstream from queue, one at a time,
+        // with last as default in case batches repeat (graceful stops for example).
+        // if the stream is empty, avoid creating a default empty RDD.
+        // mainly for unit test so no reason to have this configurable.
+        dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
+            : jssc.queueStream(rddQueue, true);
       }
       return dStream;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 1560c66..4f2a7c6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.rules.TemporaryFolder;
 
 
 /**
@@ -48,11 +48,15 @@ public class EmptyStreamAssertionTest implements Serializable {
           + "     but: was <0>";
 
   @Rule
+  public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+  @Rule
   public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
   @Test
-  public void testFixedWindows() throws Exception {
-    SparkPipelineOptions options = commonOptions.getOptions();
+  public void testAssertion() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
     Duration windowDuration = new Duration(options.getBatchIntervalMillis());
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index fc7fa34..fd1d11a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -157,7 +157,7 @@ public class ResumeFromCheckpointStreamingTest {
 
     // requires a graceful stop so that checkpointing of the first run would finish successfully
     // before stopping and attempting to resume.
-    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, true);
+    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 8f2dde3..4bc9a3d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -62,7 +62,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
 
   @Test
   public void testFixedWindows() throws Exception {
-
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
         checkpointParentDir.newFolder(getClass().getSimpleName()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 3bf1ef6..496735d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -81,13 +81,13 @@ public final class PAssertStreaming implements Serializable {
   }
 
   /**
-   * Default to stop immediately, useful for most tests except for the once that may require
-   * to finish writing checkpoints for example.
+   * Default to stop gracefully so that tests will finish processing even if slower for reasons
+   * such as a slow runtime environment.
    */
   public static <T> EvaluationResult runAndAssertContents(Pipeline p,
                                                           PCollection<T> actual,
                                                           T[] expected) {
-    return runAndAssertContents(p, actual, expected, false);
+    return runAndAssertContents(p, actual, expected, true);
   }
 
   private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {