You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:43:47 UTC
[09/50] [abbrv] incubator-beam git commit: [BEAM-769] Spark: Make
graceful stop the default.
[BEAM-769] Spark: Make graceful stop the default.
streaming tests fail on "nothing processed" if runtime env. is slow because timeout
is hit before processing is done.
Keep "pumping-in" the last batch in a mocked stream to handle overflowing batches in case of a
graceful stop.
Change tests accordingly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e43228c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e43228c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e43228c9
Branch: refs/heads/gearpump-runner
Commit: e43228c92cd9dd8a81a28940d419b721a2aeb2d8
Parents: a9a41eb
Author: Sela <an...@paypal.com>
Authored: Fri Oct 21 01:20:33 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Oct 24 07:07:27 2016 -0700
----------------------------------------------------------------------
.../streaming/StreamingEvaluationContext.java | 11 ++++++++---
.../translation/streaming/EmptyStreamAssertionTest.java | 10 +++++++---
.../streaming/ResumeFromCheckpointStreamingTest.java | 2 +-
.../streaming/SimpleStreamingWordCountTest.java | 1 -
.../translation/streaming/utils/PAssertStreaming.java | 6 +++---
5 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
index 2652f2b..49afa26 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java
@@ -92,6 +92,7 @@ public class StreamingEvaluationContext extends EvaluationContext {
WindowedValue.getValueOnlyCoder(coder);
// create the DStream from queue
Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+ JavaRDD<WindowedValue<T>> lastRDD = null;
for (Iterable<T> v : values) {
Iterable<WindowedValue<T>> windowedValues =
Iterables.transform(v, WindowingHelpers.<T>windowValueFunction());
@@ -99,10 +100,14 @@ public class StreamingEvaluationContext extends EvaluationContext {
CoderHelpers.toByteArrays(windowedValues, windowCoder)).map(
CoderHelpers.fromByteFunction(windowCoder));
rddQueue.offer(rdd);
+ lastRDD = rdd;
}
- // create dstream from queue, one at a time, no defaults
- // mainly for unit test so no reason to have this configurable
- dStream = jssc.queueStream(rddQueue, true);
+ // create dstream from queue, one at a time,
+ // with last as default in case batches repeat (graceful stops for example).
+ // if the stream is empty, avoid creating a default empty RDD.
+ // mainly for unit test so no reason to have this configurable.
+ dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD)
+ : jssc.queueStream(rddQueue, true);
}
return dStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 1560c66..4f2a7c6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
-
+import org.junit.rules.TemporaryFolder;
/**
@@ -48,11 +48,15 @@ public class EmptyStreamAssertionTest implements Serializable {
+ " but: was <0>";
@Rule
+ public TemporaryFolder checkpointParentDir = new TemporaryFolder();
+
+ @Rule
public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
@Test
- public void testFixedWindows() throws Exception {
- SparkPipelineOptions options = commonOptions.getOptions();
+ public void testAssertion() throws Exception {
+ SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+ checkpointParentDir.newFolder(getClass().getSimpleName()));
Duration windowDuration = new Duration(options.getBatchIntervalMillis());
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index fc7fa34..fd1d11a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -157,7 +157,7 @@ public class ResumeFromCheckpointStreamingTest {
// requires a graceful stop so that checkpointing of the first run would finish successfully
// before stopping and attempting to resume.
- return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, true);
+ return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 8f2dde3..4bc9a3d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -62,7 +62,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
@Test
public void testFixedWindows() throws Exception {
-
SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
checkpointParentDir.newFolder(getClass().getSimpleName()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e43228c9/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 3bf1ef6..496735d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -81,13 +81,13 @@ public final class PAssertStreaming implements Serializable {
}
/**
- * Default to stop immediately, useful for most tests except for the once that may require
- * to finish writing checkpoints for example.
+ * Default to stop gracefully so that tests will finish processing even if slower for reasons
+ * such as a slow runtime environment.
*/
public static <T> EvaluationResult runAndAssertContents(Pipeline p,
PCollection<T> actual,
T[] expected) {
- return runAndAssertContents(p, actual, expected, false);
+ return runAndAssertContents(p, actual, expected, true);
}
private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {