You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/16 22:56:55 UTC
spark git commit: [SPARK-11104] [STREAMING] Fix a deadlock in
StreamingContex.stop
Repository: spark
Updated Branches:
refs/heads/master 369d786f5 -> e1eef248f
[SPARK-11104] [STREAMING] Fix a deadlock in StreamingContex.stop
The following deadlock may happen if shutdownHook and StreamingContext.stop are running at the same time.
```
Java stack information for the threads listed above:
===================================================
"Thread-2":
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:699)
- waiting to lock <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:729)
at org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:625)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:266)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1697)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:236)
- locked <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
"main":
at org.apache.spark.util.SparkShutdownHookManager.remove(ShutdownHookManager.scala:248)
- waiting to lock <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
at org.apache.spark.util.ShutdownHookManager$.removeShutdownHook(ShutdownHookManager.scala:199)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:712)
- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:684)
- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
at org.apache.spark.streaming.SessionByKeyBenchmark$.main(SessionByKeyBenchmark.scala:108)
at org.apache.spark.streaming.SessionByKeyBenchmark.main(SessionByKeyBenchmark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
This PR just moved `ShutdownHookManager.removeShutdownHook` out of `synchronized` to avoid deadlock.
Author: zsxwing <zs...@gmail.com>
Closes #9116 from zsxwing/stop-deadlock.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1eef248
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1eef248
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1eef248
Branch: refs/heads/master
Commit: e1eef248f13f6c334fe4eea8a29a1de5470a2e62
Parents: 369d786
Author: zsxwing <zs...@gmail.com>
Authored: Fri Oct 16 13:56:51 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Oct 16 13:56:51 2015 -0700
----------------------------------------------------------------------
.../spark/streaming/StreamingContext.scala | 55 +++++++++++---------
1 file changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e1eef248/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9b2632c..051f53d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -694,32 +694,39 @@ class StreamingContext private[streaming] (
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
- def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
- try {
- state match {
- case INITIALIZED =>
- logWarning("StreamingContext has not been started yet")
- case STOPPED =>
- logWarning("StreamingContext has already been stopped")
- case ACTIVE =>
- scheduler.stop(stopGracefully)
- // Removing the streamingSource to de-register the metrics on stop()
- env.metricsSystem.removeSource(streamingSource)
- uiTab.foreach(_.detach())
- StreamingContext.setActiveContext(null)
- waiter.notifyStop()
- if (shutdownHookRef != null) {
- ShutdownHookManager.removeShutdownHook(shutdownHookRef)
- }
- logInfo("StreamingContext stopped successfully")
+ def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
+ var shutdownHookRefToRemove: AnyRef = null
+ synchronized {
+ try {
+ state match {
+ case INITIALIZED =>
+ logWarning("StreamingContext has not been started yet")
+ case STOPPED =>
+ logWarning("StreamingContext has already been stopped")
+ case ACTIVE =>
+ scheduler.stop(stopGracefully)
+ // Removing the streamingSource to de-register the metrics on stop()
+ env.metricsSystem.removeSource(streamingSource)
+ uiTab.foreach(_.detach())
+ StreamingContext.setActiveContext(null)
+ waiter.notifyStop()
+ if (shutdownHookRef != null) {
+ shutdownHookRefToRemove = shutdownHookRef
+ shutdownHookRef = null
+ }
+ logInfo("StreamingContext stopped successfully")
+ }
+ } finally {
+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet
+ state = STOPPED
}
- // Even if we have already stopped, we still need to attempt to stop the SparkContext because
- // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
- if (stopSparkContext) sc.stop()
- } finally {
- // The state should always be Stopped after calling `stop()`, even if we haven't started yet
- state = STOPPED
}
+ if (shutdownHookRefToRemove != null) {
+ ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
+ }
+ // Even if we have already stopped, we still need to attempt to stop the SparkContext because
+ // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
+ if (stopSparkContext) sc.stop()
}
private def stopOnShutdown(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org