You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/22 02:41:34 UTC

spark git commit: [SPARK-7776] [STREAMING] Added shutdown hook to StreamingContext

Repository: spark
Updated Branches:
  refs/heads/master 347b50106 -> d68ea24d6


[SPARK-7776] [STREAMING] Added shutdown hook to StreamingContext

Shutdown hook to stop SparkContext was added recently. This results in ugly errors when a streaming application is terminated by ctrl-C.

```
Exception in thread "Thread-27" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1642)
	at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:559)
	at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2266)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2218)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
```

This is because the Spark's shutdown hook stops the context, and the streaming jobs fail in the middle. The correct solution is to stop the streaming context before the spark context. This PR adds the shutdown hook to do so with a priority higher than the SparkContext's shutdown hooks priority.

Author: Tathagata Das <ta...@gmail.com>

Closes #6307 from tdas/SPARK-7776 and squashes the following commits:

e3d5475 [Tathagata Das] Added conf to specify graceful shutdown
4c18652 [Tathagata Das] Added shutdown hook to StreamingContxt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d68ea24d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d68ea24d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d68ea24d

Branch: refs/heads/master
Commit: d68ea24d60ce1aa55b06a8c107f42544d696eb41
Parents: 347b501
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 21 17:41:31 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 21 17:41:31 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/StreamingContext.scala | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d68ea24d/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 160fc42..7b77d44 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.CallSite
+import org.apache.spark.util.{CallSite, Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
 
   private val startSite = new AtomicReference[CallSite](null)
 
+  private var shutdownHookRef: AnyRef = _
+
   /**
    * Return the associated Spark context
    */
@@ -584,6 +586,8 @@ class StreamingContext private[streaming] (
           state = StreamingContextState.ACTIVE
           StreamingContext.setActiveContext(this)
         }
+        shutdownHookRef = Utils.addShutdownHook(
+          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         logInfo("StreamingContext started")
       case ACTIVE =>
         logWarning("StreamingContext has already been started")
@@ -660,6 +664,9 @@ class StreamingContext private[streaming] (
           uiTab.foreach(_.detach())
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()
+          if (shutdownHookRef != null) {
+            Utils.removeShutdownHook(shutdownHookRef)
+          }
           logInfo("StreamingContext stopped successfully")
       }
       // Even if we have already stopped, we still need to attempt to stop the SparkContext because
@@ -670,6 +677,13 @@ class StreamingContext private[streaming] (
       state = STOPPED
     }
   }
+
+  private def stopOnShutdown(): Unit = {
+    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
+    // Do not stop SparkContext, let its own shutdown hook stop it
+    stop(stopSparkContext = false, stopGracefully = stopGracefully)
+  }
 }
 
 /**
@@ -685,6 +699,8 @@ object StreamingContext extends Logging {
    */
   private val ACTIVATION_LOCK = new Object()
 
+  private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+
   private val activeContext = new AtomicReference[StreamingContext](null)
 
   private def assertNoOtherContextIsActive(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org