You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/08 21:14:59 UTC
spark git commit: [SPARK-18280][CORE] Fix potential deadlock in
`StandaloneSchedulerBackend.dead`
Repository: spark
Updated Branches:
refs/heads/master 26e1c53ac -> b6de0c98c
[SPARK-18280][CORE] Fix potential deadlock in `StandaloneSchedulerBackend.dead`
## What changes were proposed in this pull request?
"StandaloneSchedulerBackend.dead" is called in a RPC thread, so it should not call "SparkContext.stop" in the same thread. "SparkContext.stop" will block until all RPC threads exit, if it's called inside a RPC thread, it will be dead-lock.
This PR add a thread local flag inside RPC threads. `SparkContext.stop` uses it to decide if launching a new thread to stop the SparkContext.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15775 from zsxwing/SPARK-18280.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6de0c98
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6de0c98
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6de0c98
Branch: refs/heads/master
Commit: b6de0c98c70960a97b07615b0b08fbd8f900fbe7
Parents: 26e1c53
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Nov 8 13:14:56 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Nov 8 13:14:56 2016 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 22 ++++++++++++++++++--
.../scala/org/apache/spark/rpc/RpcEnv.scala | 4 ++++
.../org/apache/spark/rpc/netty/Dispatcher.scala | 1 +
.../apache/spark/rpc/netty/NettyRpcEnv.scala | 3 +++
.../org/apache/spark/rpc/RpcEnvSuite.scala | 13 ++++++++++++
5 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 9f0f607..25a3d60 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1757,8 +1757,26 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def listJars(): Seq[String] = addedJars.keySet.toSeq
- // Shut down the SparkContext.
- def stop() {
+ /**
+ * Shut down the SparkContext.
+ */
+ def stop(): Unit = {
+ if (env.rpcEnv.isInRPCThread) {
+ // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread.
+ // We should launch a new thread to call `stop` to avoid dead-lock.
+ new Thread("stop-spark-context") {
+ setDaemon(true)
+
+ override def run(): Unit = {
+ _stop()
+ }
+ }.start()
+ } else {
+ _stop()
+ }
+ }
+
+ private def _stop() {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5791228..bbc4163 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -147,6 +147,10 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
*/
def openChannel(uri: String): ReadableByteChannel
+ /**
+ * Return if the current thread is a RPC thread.
+ */
+ def isInRPCThread: Boolean
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index a02cf30..67baabd 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -201,6 +201,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
+ NettyRpcEnv.rpcThreadFlag.value = true
try {
while (true) {
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index e51649a..0b8cd14 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -408,10 +408,13 @@ private[netty] class NettyRpcEnv(
}
+ override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value
}
private[netty] object NettyRpcEnv extends Logging {
+ private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false)
+
/**
* When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]].
* Use `currentEnv` to wrap the deserialization codes. E.g.,
http://git-wip-us.apache.org/repos/asf/spark/blob/b6de0c98/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index acdf21d..aa07059 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -870,6 +870,19 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
verify(endpoint, never()).onDisconnected(any())
verify(endpoint, never()).onNetworkError(any(), any())
}
+
+ test("isInRPCThread") {
+ val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint {
+ override val rpcEnv = env
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case m => context.reply(rpcEnv.isInRPCThread)
+ }
+ })
+ assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true)
+ assert(env.isInRPCThread === false)
+ env.stop(rpcEndpointRef)
+ }
}
class UnserializableClass
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org