You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/07/31 19:16:49 UTC
spark git commit: [SPARK-9446] Clear Active SparkContext in stop()
method
Repository: spark
Updated Branches:
refs/heads/master 04a49edfd -> 27ae851ce
[SPARK-9446] Clear Active SparkContext in stop() method
In thread 'stopped SparkContext remaining active' on mailing list, Andres observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has disassociated: <address removed>
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" org.apache.spark.SparkException: Error asking standalone scheduler to shut down executors
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Asking each executor to shut down
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to user since SparkContext.clearActiveContext() is not called.
Author: tedyu <yu...@gmail.com>
Closes #7756 from tedyu/master and squashes the following commits:
7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ae851c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ae851c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ae851c
Branch: refs/heads/master
Commit: 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc
Parents: 04a49ed
Author: tedyu <yu...@gmail.com>
Authored: Fri Jul 31 18:16:55 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jul 31 18:16:55 2015 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 50 +++++++++++++++-----
1 file changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/27ae851c/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ac6ac6c..2d8aa25 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1689,33 +1689,57 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
Utils.removeShutdownHook(_shutdownHookRef)
}
- postApplicationEnd()
- _ui.foreach(_.stop())
+ Utils.tryLogNonFatalError {
+ postApplicationEnd()
+ }
+ Utils.tryLogNonFatalError {
+ _ui.foreach(_.stop())
+ }
if (env != null) {
- env.metricsSystem.report()
+ Utils.tryLogNonFatalError {
+ env.metricsSystem.report()
+ }
}
if (metadataCleaner != null) {
- metadataCleaner.cancel()
+ Utils.tryLogNonFatalError {
+ metadataCleaner.cancel()
+ }
+ }
+ Utils.tryLogNonFatalError {
+ _cleaner.foreach(_.stop())
+ }
+ Utils.tryLogNonFatalError {
+ _executorAllocationManager.foreach(_.stop())
}
- _cleaner.foreach(_.stop())
- _executorAllocationManager.foreach(_.stop())
if (_dagScheduler != null) {
- _dagScheduler.stop()
+ Utils.tryLogNonFatalError {
+ _dagScheduler.stop()
+ }
_dagScheduler = null
}
if (_listenerBusStarted) {
- listenerBus.stop()
- _listenerBusStarted = false
+ Utils.tryLogNonFatalError {
+ listenerBus.stop()
+ _listenerBusStarted = false
+ }
+ }
+ Utils.tryLogNonFatalError {
+ _eventLogger.foreach(_.stop())
}
- _eventLogger.foreach(_.stop())
if (env != null && _heartbeatReceiver != null) {
- env.rpcEnv.stop(_heartbeatReceiver)
+ Utils.tryLogNonFatalError {
+ env.rpcEnv.stop(_heartbeatReceiver)
+ }
+ }
+ Utils.tryLogNonFatalError {
+ _progressBar.foreach(_.stop())
}
- _progressBar.foreach(_.stop())
_taskScheduler = null
// TODO: Cache.stop()?
if (_env != null) {
- _env.stop()
+ Utils.tryLogNonFatalError {
+ _env.stop()
+ }
SparkEnv.set(null)
}
SparkContext.clearActiveContext()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org