You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/20 17:47:39 UTC
[1/2] incubator-beam git commit: [Beam-1001] Add non blocking
cancel() and waitUntilFinish() for streaming applications. remove timeout
parameer in spark pipeline option.
Repository: incubator-beam
Updated Branches:
refs/heads/master d93e9a88b -> 875631f07
[Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications.
remove timeout parameer in spark pipeline option.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafd5be7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafd5be7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafd5be7
Branch: refs/heads/master
Commit: dafd5be7f69f191fc9edb8b9f9aec010ca368f50
Parents: d93e9a8
Author: ksalant <ks...@payapal.com>
Authored: Sun Nov 20 11:57:16 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Nov 20 19:25:52 2016 +0200
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 6 --
.../apache/beam/runners/spark/SparkRunner.java | 5 +-
.../spark/translation/EvaluationContext.java | 59 +++++++++++++-------
.../SparkRunnerStreamingContextFactory.java | 3 +-
.../streaming/EmptyStreamAssertionTest.java | 3 +-
.../streaming/FlattenStreamingTest.java | 4 +-
.../streaming/KafkaStreamingTest.java | 11 ++--
.../ResumeFromCheckpointStreamingTest.java | 3 +-
.../streaming/SimpleStreamingWordCountTest.java | 4 +-
.../streaming/utils/PAssertStreaming.java | 8 ++-
.../SparkTestPipelineOptionsForStreaming.java | 1 -
11 files changed, 60 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index b1ebde9..0fd790e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -39,12 +39,6 @@ public interface SparkPipelineOptions
String getSparkMaster();
void setSparkMaster(String master);
- @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until "
- + "execution is stopped")
- @Default.Long(-1)
- Long getTimeout();
- void setTimeout(Long timeoutMillis);
-
@Description("Batch interval for Spark streaming in milliseconds.")
@Default.Long(1000)
Long getBatchIntervalMillis();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 6bbef39..e800071 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -137,11 +137,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
// if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(),
- pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
+ pipeline, jssc) : contextFactory.getCtxt();
} else {
- if (mOptions.getTimeout() > 0) {
- LOG.info("Timeout is ignored by the SparkRunner in batch.");
- }
JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
SparkPipelineTranslator translator = new TransformTranslator.Translator();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index aaf7573..1183fbb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.joda.time.Duration;
@@ -57,7 +58,6 @@ public class EvaluationContext implements EvaluationResult {
private JavaStreamingContext jssc;
private final SparkRuntimeContext runtime;
private final Pipeline pipeline;
- private long timeout;
private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
private final Set<Dataset> leaves = new LinkedHashSet<>();
@@ -76,10 +76,9 @@ public class EvaluationContext implements EvaluationResult {
}
public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
- JavaStreamingContext jssc, long timeout) {
+ JavaStreamingContext jssc) {
this(jsc, pipeline);
this.jssc = jssc;
- this.timeout = timeout;
this.state = State.RUNNING;
}
@@ -226,18 +225,14 @@ public class EvaluationContext implements EvaluationResult {
@Override
public void close(boolean gracefully) {
- if (isStreamingPipeline()) {
- // stop streaming context
- if (timeout > 0) {
- jssc.awaitTerminationOrTimeout(timeout);
- } else {
- jssc.awaitTermination();
+ // Stopping streaming job if running
+ if (isStreamingPipeline() && !state.isTerminal()) {
+ try {
+ cancel(gracefully);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to cancel streaming job", e);
}
- // stop streaming context gracefully, so checkpointing (and other computations) get to
- // finish before shutdown.
- jssc.stop(false, gracefully);
}
- state = State.DONE;
SparkContextFactory.stopSparkContext(jsc);
}
@@ -248,21 +243,47 @@ public class EvaluationContext implements EvaluationResult {
@Override
public State cancel() throws IOException {
- throw new UnsupportedOperationException(
- "Spark runner EvaluationContext does not support cancel.");
+ return cancel(true);
+ }
+
+ private State cancel(boolean gracefully) throws IOException {
+ if (isStreamingPipeline()) {
+ if (!state.isTerminal()) {
+ jssc.stop(false, gracefully);
+ state = State.CANCELLED;
+ }
+ return state;
+ } else {
+ // Batch is currently blocking so
+ // there is no way to cancel a batch job
+ // will be handled at BEAM-1000
+ throw new UnsupportedOperationException(
+ "Spark runner EvaluationContext does not support cancel.");
+ }
}
@Override
public State waitUntilFinish() {
- return waitUntilFinish(Duration.millis(-1));
+ return waitUntilFinish(Duration.ZERO);
}
@Override
public State waitUntilFinish(Duration duration) {
if (isStreamingPipeline()) {
- throw new UnsupportedOperationException(
- "Spark runner EvaluationContext does not support waitUntilFinish for streaming "
- + "pipelines.");
+ // According to PipelineResult: Provide a value less than 1 ms for an infinite wait
+ if (duration.getMillis() < 1L) {
+ jssc.awaitTermination();
+ state = State.DONE;
+ } else {
+ jssc.awaitTermination(duration.getMillis());
+ // According to PipelineResult: The final state of the pipeline or null on timeout
+ if (jssc.getState().equals(StreamingContextState.STOPPED)) {
+ state = State.DONE;
+ } else {
+ return null;
+ }
+ }
+ return state;
} else {
// This is no-op, since Spark runner in batch is blocking.
// It needs to be updated once SparkRunner supports non-blocking execution:
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 01398e4..af90ff1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -73,8 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
- ctxt = new EvaluationContext(jsc, pipeline, jssc,
- options.getTimeout());
+ ctxt = new EvaluationContext(jsc, pipeline, jssc);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
ctxt.computeOutputs();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index ec75eb7..d40bcff 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -68,7 +68,8 @@ public class EmptyStreamAssertionTest implements Serializable {
.apply(Window.<String>into(FixedWindows.of(windowDuration)));
try {
- PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]);
+ PAssertStreaming.runAndAssertContents(pipeline, output, new String[0],
+ Duration.standardSeconds(1L));
} catch (AssertionError e) {
assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(),
e.getMessage().equals(EXPECTED_ERR));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index f69bd7f..3e75b18 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -76,7 +76,7 @@ public class FlattenStreamingTest {
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());
- PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
+ PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
}
@Test
@@ -95,7 +95,7 @@ public class FlattenStreamingTest {
PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
PCollection<String> union = list.apply(Flatten.<String>pCollections());
- PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
+ PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 6b2486b..d55ed39 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -118,7 +118,7 @@ public class KafkaStreamingTest {
.apply(ParDo.of(new FormatKVFn()))
.apply(Distinct.<String>create());
- PAssertStreaming.runAndAssertContents(p, deduped, expected);
+ PAssertStreaming.runAndAssertContents(p, deduped, expected, Duration.standardSeconds(1L));
}
@Test
@@ -143,10 +143,6 @@ public class KafkaStreamingTest {
// It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
// so to be on the safe side we'll set to 750 msec.
options.setMinReadTimeMillis(750L);
- // run for more than 1 batch interval, so that reading of latest is attempted in the
- // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
- // in the second interval.
- options.setTimeout(Duration.standardSeconds(3).getMillis());
//------- test: read and format.
Pipeline p = Pipeline.create(options);
@@ -168,7 +164,10 @@ public class KafkaStreamingTest {
.apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
.apply(ParDo.of(new FormatKVFn()));
- PAssertStreaming.runAndAssertContents(p, formatted, expected);
+ // run for more than 1 batch interval, so that reading of latest is attempted in the
+ // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
+ // in the second interval.
+ PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3));
}
private static void produce(String topic, Map<String, String> messages) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index af93d84..b57787f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -163,7 +163,8 @@ public class ResumeFromCheckpointStreamingTest {
// requires a graceful stop so that checkpointing of the first run would finish successfully
// before stopping and attempting to resume.
- return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
+ return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED,
+ Duration.standardSeconds(1L));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4c503c4..9a15ff2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -67,8 +67,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
// override defaults
options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
- // graceful stop is on, so no worries about the timeout and window being equal
- options.setTimeout(windowDuration.getMillis());
Pipeline pipeline = Pipeline.create(options);
@@ -80,6 +78,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
- PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS);
+ PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 496735d..23aca43 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public final class PAssertStreaming implements Serializable {
public static <T> EvaluationResult runAndAssertContents(Pipeline p,
PCollection<T> actual,
T[] expected,
+ Duration timeout,
boolean stopGracefully) {
// Because PAssert does not support non-global windowing, but all our data is in one window,
// we set up the assertion directly.
@@ -68,6 +70,7 @@ public final class PAssertStreaming implements Serializable {
// run the pipeline.
EvaluationResult res = (EvaluationResult) p.run();
+ res.waitUntilFinish(timeout);
res.close(stopGracefully);
// validate assertion succeeded (at least once).
int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
@@ -86,8 +89,9 @@ public final class PAssertStreaming implements Serializable {
*/
public static <T> EvaluationResult runAndAssertContents(Pipeline p,
PCollection<T> actual,
- T[] expected) {
- return runAndAssertContents(p, actual, expected, true);
+ T[] expected,
+ Duration timeout) {
+ return runAndAssertContents(p, actual, expected, timeout, true);
}
private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
index 1c0b68a..f74c74a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
@@ -31,7 +31,6 @@ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptio
@Override
protected void before() throws Throwable {
super.before();
- options.setTimeout(1000L);
options.setStreaming(true);
}
[2/2] incubator-beam git commit: This closes #1393
Posted by am...@apache.org.
This closes #1393
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/875631f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/875631f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/875631f0
Branch: refs/heads/master
Commit: 875631f07b1e4556afec28dc850bd7fe2d07444b
Parents: d93e9a8 dafd5be
Author: Sela <an...@paypal.com>
Authored: Sun Nov 20 19:26:52 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Nov 20 19:26:52 2016 +0200
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 6 --
.../apache/beam/runners/spark/SparkRunner.java | 5 +-
.../spark/translation/EvaluationContext.java | 59 +++++++++++++-------
.../SparkRunnerStreamingContextFactory.java | 3 +-
.../streaming/EmptyStreamAssertionTest.java | 3 +-
.../streaming/FlattenStreamingTest.java | 4 +-
.../streaming/KafkaStreamingTest.java | 11 ++--
.../ResumeFromCheckpointStreamingTest.java | 3 +-
.../streaming/SimpleStreamingWordCountTest.java | 4 +-
.../streaming/utils/PAssertStreaming.java | 8 ++-
.../SparkTestPipelineOptionsForStreaming.java | 1 -
11 files changed, 60 insertions(+), 47 deletions(-)
----------------------------------------------------------------------