You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/06/02 05:05:04 UTC

spark git commit: [SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors

Repository: spark
Updated Branches:
  refs/heads/master 90c606925 -> 2f9c7519d


[SPARK-7958] [STREAMING] Handled exception in StreamingContext.start() to prevent leaking of actors

StreamingContext.start() can throw exception because DStream.validateAtStart() fails (say, checkpoint directory not set for StateDStream). But by then JobScheduler, JobGenerator, and ReceiverTracker has already started, along with their actors. But those cannot be shutdown because the only way to do that is call StreamingContext.stop() which cannot be called as the context has not been marked as ACTIVE.

The solution in this PR is to stop the internal scheduler if start throw exception, and mark the context as STOPPED.

Author: Tathagata Das <ta...@gmail.com>

Closes #6559 from tdas/SPARK-7958 and squashes the following commits:

20b2ec1 [Tathagata Das] Added synchronized
790b617 [Tathagata Das] Handled exception in StreamingContext.start()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f9c7519
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f9c7519
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f9c7519

Branch: refs/heads/master
Commit: 2f9c7519d6a3f867100979b5e7ced3f72b7d9adc
Parents: 90c6069
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jun 1 20:04:57 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jun 1 20:04:57 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/StreamingContext.scala  | 17 +++++++++++++----
 .../spark/streaming/scheduler/JobScheduler.scala   |  4 ++++
 .../spark/streaming/StreamingContextSuite.scala    | 16 ++++++++++++++++
 3 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f9c7519/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 25842d5..624a31d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import scala.collection.Map
 import scala.collection.mutable.Queue
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import akka.actor.{Props, SupervisorStrategy}
 import org.apache.hadoop.conf.Configuration
@@ -576,18 +577,26 @@ class StreamingContext private[streaming] (
   def start(): Unit = synchronized {
     state match {
       case INITIALIZED =>
-        validate()
         startSite.set(DStream.getCreationSite())
         sparkContext.setCallSite(startSite.get)
         StreamingContext.ACTIVATION_LOCK.synchronized {
           StreamingContext.assertNoOtherContextIsActive()
-          scheduler.start()
-          uiTab.foreach(_.attach())
-          state = StreamingContextState.ACTIVE
+          try {
+            validate()
+            scheduler.start()
+            state = StreamingContextState.ACTIVE
+          } catch {
+            case NonFatal(e) =>
+              logError("Error starting the context, marking it as stopped", e)
+              scheduler.stop(false)
+              state = StreamingContextState.STOPPED
+              throw e
+          }
           StreamingContext.setActiveContext(this)
         }
         shutdownHookRef = Utils.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+        uiTab.foreach(_.attach())
         logInfo("StreamingContext started")
       case ACTIVE =>
         logWarning("StreamingContext has already been started")

http://git-wip-us.apache.org/repos/asf/spark/blob/2f9c7519/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1d1ddaa..4af9b6d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -126,6 +126,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     eventLoop.post(ErrorReported(msg, e))
   }
 
+  def isStarted(): Boolean = synchronized {
+    eventLoop != null
+  }
+
   private def processEvent(event: JobSchedulerEvent) {
     try {
       event match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2f9c7519/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d304c9a..819dd2c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -151,6 +151,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
     assert(StreamingContext.getActive().isEmpty)
   }
 
+  test("start failure should stop internal components") {
+    ssc = new StreamingContext(conf, batchDuration)
+    val inputStream = addInputStream(ssc)
+    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+      Some(values.sum + state.getOrElse(0))
+    }
+    inputStream.map(x => (x, 1)).updateStateByKey[Int](updateFunc)
+    // Require that the start fails because checkpoint directory was not set
+    intercept[Exception] {
+      ssc.start()
+    }
+    assert(ssc.getState() === StreamingContextState.STOPPED)
+    assert(ssc.scheduler.isStarted === false)
+  }
+
+
   test("start multiple times") {
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org