You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/08 21:03:38 UTC
spark git commit: [SPARK-19481] [REPL] [MAVEN] Avoid to leak
SparkContext in Signaling.cancelOnInterrupt
Repository: spark
Updated Branches:
refs/heads/branch-2.0 da3dfafa9 -> c561e6cfa
[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt
## What changes were proposed in this pull request?
`Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable.
This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16825 from zsxwing/SPARK-19481.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c561e6cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c561e6cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c561e6cf
Branch: refs/heads/branch-2.0
Commit: c561e6cfaf8e67a58fa79a1d7284b779fee4e79f
Parents: da3dfaf
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Feb 9 11:16:51 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Mar 8 12:49:53 2017 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 7 +++++++
.../main/scala/org/apache/spark/repl/Main.scala | 1 +
.../org/apache/spark/repl/SparkILoop.scala | 1 -
.../main/scala/org/apache/spark/repl/Main.scala | 2 +-
.../scala/org/apache/spark/repl/Signaling.scala | 20 +++++++++++---------
5 files changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2abe444..daef497 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2295,6 +2295,13 @@ object SparkContext extends Logging {
getOrCreate(new SparkConf())
}
+ /** Return the current active [[SparkContext]] if any. */
+ private[spark] def getActive: Option[SparkContext] = {
+ SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
+ Option(activeContext.get())
+ }
+ }
+
/**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
index 7b4e14b..fba321b 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
object Main extends Logging {
initializeLogIfNecessary(true)
+ Signaling.cancelOnInterrupt()
private var _interp: SparkILoop = _
http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index e017aa4..b7237a6 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1027,7 +1027,6 @@ class SparkILoop(
builder.getOrCreate()
}
sparkContext = sparkSession.sparkContext
- Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 5dfe18a..13b772b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
object Main extends Logging {
initializeLogIfNecessary(true)
+ Signaling.cancelOnInterrupt()
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@@ -108,7 +109,6 @@ object Main extends Logging {
logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
- Signaling.cancelOnInterrupt(sparkContext)
sparkSession
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
index 202febf..9577e0e 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala
@@ -28,15 +28,17 @@ private[repl] object Signaling extends Logging {
* when no jobs are currently running.
* This makes it possible to interrupt a running shell job by pressing Ctrl+C.
*/
- def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {
- if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
- logWarning("Cancelling all active jobs, this can take a while. " +
- "Press Ctrl+C again to exit now.")
- ctx.cancelAllJobs()
- true
- } else {
- false
- }
+ def cancelOnInterrupt(): Unit = SignalUtils.register("INT") {
+ SparkContext.getActive.map { ctx =>
+ if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
+ logWarning("Cancelling all active jobs, this can take a while. " +
+ "Press Ctrl+C again to exit now.")
+ ctx.cancelAllJobs()
+ true
+ } else {
+ false
+ }
+ }.getOrElse(false)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org