You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/08 21:14:59 UTC

spark git commit: [SPARK-18280][CORE] Fix potential deadlock in `StandaloneSchedulerBackend.dead`

Repository: spark
Updated Branches:
  refs/heads/master 26e1c53ac -> b6de0c98c


[SPARK-18280][CORE] Fix potential deadlock in `StandaloneSchedulerBackend.dead`

## What changes were proposed in this pull request?

"StandaloneSchedulerBackend.dead" is called in a RPC thread, so it should not call "SparkContext.stop" in the same thread. "SparkContext.stop" will block until all RPC threads exit, if it's called inside a RPC thread, it will be dead-lock.

This PR add a thread local flag inside RPC threads. `SparkContext.stop` uses it to decide if launching a new thread to stop the SparkContext.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #15775 from zsxwing/SPARK-18280.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6de0c98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6de0c98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6de0c98

Branch: refs/heads/master
Commit: b6de0c98c70960a97b07615b0b08fbd8f900fbe7
Parents: 26e1c53
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Nov 8 13:14:56 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Nov 8 13:14:56 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 22 ++++++++++++++++++--
 .../scala/org/apache/spark/rpc/RpcEnv.scala     |  4 ++++
 .../org/apache/spark/rpc/netty/Dispatcher.scala |  1 +
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |  3 +++
 .../org/apache/spark/rpc/RpcEnvSuite.scala      | 13 ++++++++++++
 5 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f0f607..25a3d60 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1757,8 +1757,26 @@ class SparkContext(config: SparkConf) extends Logging {
    */
   def listJars(): Seq[String] = addedJars.keySet.toSeq
 
-  // Shut down the SparkContext.
-  def stop() {
+  /**
+   * Shut down the SparkContext.
+   */
+  def stop(): Unit = {
+    if (env.rpcEnv.isInRPCThread) {
+      // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread.
+      // We should launch a new thread to call `stop` to avoid dead-lock.
+      new Thread("stop-spark-context") {
+        setDaemon(true)
+
+        override def run(): Unit = {
+          _stop()
+        }
+      }.start()
+    } else {
+      _stop()
+    }
+  }
+
+  private def _stop() {
     if (LiveListenerBus.withinListenerThread.value) {
       throw new SparkException(
         s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")

http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5791228..bbc4163 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -147,6 +147,10 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
    */
   def openChannel(uri: String): ReadableByteChannel
 
+  /**
+   * Return if the current thread is a RPC thread.
+   */
+  def isInRPCThread: Boolean
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index a02cf30..67baabd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -201,6 +201,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
   /** Message loop used for dispatching messages. */
   private class MessageLoop extends Runnable {
     override def run(): Unit = {
+      NettyRpcEnv.rpcThreadFlag.value = true
       try {
         while (true) {
           try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e51649a..0b8cd14 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -408,10 +408,13 @@ private[netty] class NettyRpcEnv(
 
   }
 
+  override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value
 }
 
 private[netty] object NettyRpcEnv extends Logging {
 
+  private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false)
+
   /**
    * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]].
    * Use `currentEnv` to wrap the deserialization codes. E.g.,

http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index acdf21d..aa07059 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     verify(endpoint, never()).onDisconnected(any())
     verify(endpoint, never()).onNetworkError(any(), any())
   }
+
+  test("isInRPCThread") {
+    val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint {
+      override val rpcEnv = env
+
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+        case m => context.reply(rpcEnv.isInRPCThread)
+      }
+    })
+    assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true)
+    assert(env.isInRPCThread === false)
+    env.stop(rpcEndpointRef)
+  }
 }
 
 class UnserializableClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org