You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/20 17:47:39 UTC

[1/2] incubator-beam git commit: [Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications. remove timeout parameer in spark pipeline option.

Repository: incubator-beam
Updated Branches:
  refs/heads/master d93e9a88b -> 875631f07


[Beam-1001] Add non blocking cancel() and waitUntilFinish() for streaming applications.
remove timeout parameer in spark pipeline option.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dafd5be7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dafd5be7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dafd5be7

Branch: refs/heads/master
Commit: dafd5be7f69f191fc9edb8b9f9aec010ca368f50
Parents: d93e9a8
Author: ksalant <ks...@payapal.com>
Authored: Sun Nov 20 11:57:16 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Nov 20 19:25:52 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  6 --
 .../apache/beam/runners/spark/SparkRunner.java  |  5 +-
 .../spark/translation/EvaluationContext.java    | 59 +++++++++++++-------
 .../SparkRunnerStreamingContextFactory.java     |  3 +-
 .../streaming/EmptyStreamAssertionTest.java     |  3 +-
 .../streaming/FlattenStreamingTest.java         |  4 +-
 .../streaming/KafkaStreamingTest.java           | 11 ++--
 .../ResumeFromCheckpointStreamingTest.java      |  3 +-
 .../streaming/SimpleStreamingWordCountTest.java |  4 +-
 .../streaming/utils/PAssertStreaming.java       |  8 ++-
 .../SparkTestPipelineOptionsForStreaming.java   |  1 -
 11 files changed, 60 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index b1ebde9..0fd790e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -39,12 +39,6 @@ public interface SparkPipelineOptions
   String getSparkMaster();
   void setSparkMaster(String master);
 
-  @Description("Timeout to wait (in msec) for a streaming execution to stop, -1 runs until "
-          + "execution is stopped")
-  @Default.Long(-1)
-  Long getTimeout();
-  void setTimeout(Long timeoutMillis);
-
   @Description("Batch interval for Spark streaming in milliseconds.")
   @Default.Long(1000)
   Long getBatchIntervalMillis();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 6bbef39..e800071 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -137,11 +137,8 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
 
         // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
         return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(),
-            pipeline, jssc, mOptions.getTimeout()) : contextFactory.getCtxt();
+            pipeline, jssc) : contextFactory.getCtxt();
       } else {
-        if (mOptions.getTimeout() > 0) {
-          LOG.info("Timeout is ignored by the SparkRunner in batch.");
-        }
         JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
         EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
         SparkPipelineTranslator translator = new TransformTranslator.Translator();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index aaf7573..1183fbb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.StreamingContextState;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.joda.time.Duration;
 
@@ -57,7 +58,6 @@ public class EvaluationContext implements EvaluationResult {
   private JavaStreamingContext jssc;
   private final SparkRuntimeContext runtime;
   private final Pipeline pipeline;
-  private long timeout;
   private final Map<PValue, Dataset> datasets = new LinkedHashMap<>();
   private final Map<PValue, Dataset> pcollections = new LinkedHashMap<>();
   private final Set<Dataset> leaves = new LinkedHashSet<>();
@@ -76,10 +76,9 @@ public class EvaluationContext implements EvaluationResult {
   }
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
-                           JavaStreamingContext jssc, long timeout) {
+                           JavaStreamingContext jssc) {
     this(jsc, pipeline);
     this.jssc = jssc;
-    this.timeout = timeout;
     this.state = State.RUNNING;
   }
 
@@ -226,18 +225,14 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public void close(boolean gracefully) {
-    if (isStreamingPipeline()) {
-      // stop streaming context
-      if (timeout > 0) {
-        jssc.awaitTerminationOrTimeout(timeout);
-      } else {
-        jssc.awaitTermination();
+    // Stopping streaming job if running
+    if (isStreamingPipeline() && !state.isTerminal()) {
+      try {
+        cancel(gracefully);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to cancel streaming job", e);
       }
-      // stop streaming context gracefully, so checkpointing (and other computations) get to
-      // finish before shutdown.
-      jssc.stop(false, gracefully);
     }
-    state = State.DONE;
     SparkContextFactory.stopSparkContext(jsc);
   }
 
@@ -248,21 +243,47 @@ public class EvaluationContext implements EvaluationResult {
 
   @Override
   public State cancel() throws IOException {
-    throw new UnsupportedOperationException(
-        "Spark runner EvaluationContext does not support cancel.");
+    return cancel(true);
+  }
+
+  private State cancel(boolean gracefully) throws IOException {
+    if (isStreamingPipeline()) {
+      if (!state.isTerminal()) {
+        jssc.stop(false, gracefully);
+        state = State.CANCELLED;
+      }
+      return state;
+    } else {
+      // Batch is currently blocking so
+      // there is no way to cancel a batch job
+      // will be handled at BEAM-1000
+      throw new UnsupportedOperationException(
+          "Spark runner EvaluationContext does not support cancel.");
+    }
   }
 
   @Override
   public State waitUntilFinish() {
-    return waitUntilFinish(Duration.millis(-1));
+    return waitUntilFinish(Duration.ZERO);
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
     if (isStreamingPipeline()) {
-      throw new UnsupportedOperationException(
-          "Spark runner EvaluationContext does not support waitUntilFinish for streaming "
-              + "pipelines.");
+      // According to PipelineResult: Provide a value less than 1 ms for an infinite wait
+      if (duration.getMillis() < 1L) {
+        jssc.awaitTermination();
+        state = State.DONE;
+      } else {
+        jssc.awaitTermination(duration.getMillis());
+        // According to PipelineResult: The final state of the pipeline or null on timeout
+        if (jssc.getState().equals(StreamingContextState.STOPPED)) {
+          state = State.DONE;
+        } else {
+          return null;
+        }
+      }
+      return state;
     } else {
       // This is no-op, since Spark runner in batch is blocking.
       // It needs to be updated once SparkRunner supports non-blocking execution:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 01398e4..af90ff1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -73,8 +73,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
 
     JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
-    ctxt = new EvaluationContext(jsc, pipeline, jssc,
-        options.getTimeout());
+    ctxt = new EvaluationContext(jsc, pipeline, jssc);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
     ctxt.computeOutputs();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index ec75eb7..d40bcff 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -68,7 +68,8 @@ public class EmptyStreamAssertionTest implements Serializable {
             .apply(Window.<String>into(FixedWindows.of(windowDuration)));
 
     try {
-      PAssertStreaming.runAndAssertContents(pipeline, output, new String[0]);
+      PAssertStreaming.runAndAssertContents(pipeline, output, new String[0],
+          Duration.standardSeconds(1L));
     } catch (AssertionError e) {
       assertTrue("Expected error message: " + EXPECTED_ERR + " but got: " + e.getMessage(),
           e.getMessage().equals(EXPECTED_ERR));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index f69bd7f..3e75b18 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -76,7 +76,7 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
+    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
   }
 
   @Test
@@ -95,7 +95,7 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION);
+    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 6b2486b..d55ed39 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -118,7 +118,7 @@ public class KafkaStreamingTest {
         .apply(ParDo.of(new FormatKVFn()))
         .apply(Distinct.<String>create());
 
-    PAssertStreaming.runAndAssertContents(p, deduped, expected);
+    PAssertStreaming.runAndAssertContents(p, deduped, expected, Duration.standardSeconds(1L));
   }
 
   @Test
@@ -143,10 +143,6 @@ public class KafkaStreamingTest {
     // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
     // so to be on the safe side we'll set to 750 msec.
     options.setMinReadTimeMillis(750L);
-    // run for more than 1 batch interval, so that reading of latest is attempted in the
-    // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
-    // in the second interval.
-    options.setTimeout(Duration.standardSeconds(3).getMillis());
 
     //------- test: read and format.
     Pipeline p = Pipeline.create(options);
@@ -168,7 +164,10 @@ public class KafkaStreamingTest {
         .apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
         .apply(ParDo.of(new FormatKVFn()));
 
-    PAssertStreaming.runAndAssertContents(p, formatted, expected);
+    // run for more than 1 batch interval, so that reading of latest is attempted in the
+    // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
+    // in the second interval.
+    PAssertStreaming.runAndAssertContents(p, formatted, expected, Duration.standardSeconds(3));
   }
 
   private static void produce(String topic, Map<String, String> messages) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index af93d84..b57787f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -163,7 +163,8 @@ public class ResumeFromCheckpointStreamingTest {
 
     // requires a graceful stop so that checkpointing of the first run would finish successfully
     // before stopping and attempting to resume.
-    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
+    return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED,
+            Duration.standardSeconds(1L));
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 4c503c4..9a15ff2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -67,8 +67,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
 
     // override defaults
     options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
-    // graceful stop is on, so no worries about the timeout and window being equal
-    options.setTimeout(windowDuration.getMillis());
 
     Pipeline pipeline = Pipeline.create(options);
 
@@ -80,6 +78,6 @@ public class SimpleStreamingWordCountTest implements Serializable {
             .apply(new WordCount.CountWords())
             .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
-    PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS);
+    PAssertStreaming.runAndAssertContents(pipeline, output, EXPECTED_WORD_COUNTS, windowDuration);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 496735d..23aca43 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public final class PAssertStreaming implements Serializable {
   public static <T> EvaluationResult runAndAssertContents(Pipeline p,
                                                           PCollection<T> actual,
                                                           T[] expected,
+                                                          Duration timeout,
                                                           boolean stopGracefully) {
     // Because PAssert does not support non-global windowing, but all our data is in one window,
     // we set up the assertion directly.
@@ -68,6 +70,7 @@ public final class PAssertStreaming implements Serializable {
 
     // run the pipeline.
     EvaluationResult res = (EvaluationResult) p.run();
+    res.waitUntilFinish(timeout);
     res.close(stopGracefully);
     // validate assertion succeeded (at least once).
     int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
@@ -86,8 +89,9 @@ public final class PAssertStreaming implements Serializable {
    */
   public static <T> EvaluationResult runAndAssertContents(Pipeline p,
                                                           PCollection<T> actual,
-                                                          T[] expected) {
-    return runAndAssertContents(p, actual, expected, true);
+                                                          T[] expected,
+                                                          Duration timeout) {
+    return runAndAssertContents(p, actual, expected, timeout, true);
   }
 
   private static class AssertDoFn<T> extends OldDoFn<Iterable<T>, Void> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dafd5be7/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
index 1c0b68a..f74c74a 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
@@ -31,7 +31,6 @@ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptio
   @Override
   protected void before() throws Throwable {
     super.before();
-    options.setTimeout(1000L);
     options.setStreaming(true);
   }
 


[2/2] incubator-beam git commit: This closes #1393

Posted by am...@apache.org.
This closes #1393


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/875631f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/875631f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/875631f0

Branch: refs/heads/master
Commit: 875631f07b1e4556afec28dc850bd7fe2d07444b
Parents: d93e9a8 dafd5be
Author: Sela <an...@paypal.com>
Authored: Sun Nov 20 19:26:52 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Nov 20 19:26:52 2016 +0200

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |  6 --
 .../apache/beam/runners/spark/SparkRunner.java  |  5 +-
 .../spark/translation/EvaluationContext.java    | 59 +++++++++++++-------
 .../SparkRunnerStreamingContextFactory.java     |  3 +-
 .../streaming/EmptyStreamAssertionTest.java     |  3 +-
 .../streaming/FlattenStreamingTest.java         |  4 +-
 .../streaming/KafkaStreamingTest.java           | 11 ++--
 .../ResumeFromCheckpointStreamingTest.java      |  3 +-
 .../streaming/SimpleStreamingWordCountTest.java |  4 +-
 .../streaming/utils/PAssertStreaming.java       |  8 ++-
 .../SparkTestPipelineOptionsForStreaming.java   |  1 -
 11 files changed, 60 insertions(+), 47 deletions(-)
----------------------------------------------------------------------