You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:40 UTC

[49/50] [abbrv] beam git commit: [BEAM-3206] Shut down executor when spark runner finishes

[BEAM-3206] Shut down executor when spark runner finishes

The Spark runner previously left the JVM process hanging after
completion because its one-time use executor service was never shut
down. This change shuts down the executor after jobs have been
submitted, allowing graceful JVM termination.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5b27333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5b27333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5b27333

Branch: refs/heads/tez-runner
Commit: b5b2733338e7a0d5dd373b7a19bea315b3b1c692
Parents: 0df7ba9
Author: Ben Sidhom <si...@google.com>
Authored: Wed Nov 15 16:05:49 2017 -0800
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Fri Nov 17 15:15:08 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/runners/spark/SparkRunner.java   | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5b27333/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 98ca1be..4a409cb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -190,6 +190,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
                   jssc.start();
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
     } else {
@@ -214,6 +215,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
                   LOG.info("Batch pipeline execution complete.");
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
     }