You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/04/25 05:21:15 UTC
git commit: [SPARK-986]: Job cancelation for PySpark
Repository: spark
Updated Branches:
refs/heads/master ee6f7e22a -> e53eb4f01
[SPARK-986]: Job cancelation for PySpark
* Additions to the PySpark API to cancel jobs
* Monitor Thread in PythonRDD to kill Python workers if a task is interrupted
Author: Ahir Reddy <ah...@gmail.com>
Closes #541 from ahirreddy/python-cancel and squashes the following commits:
dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer
6c860ab [Ahir Reddy] PR Comments
4b4100a [Ahir Reddy] Success flag
adba6ed [Ahir Reddy] Destroy python workers
27a2f8f [Ahir Reddy] Start the writer thread...
d422f7b [Ahir Reddy] Remove unnecesssary vals
adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker
d9e472f [Ahir Reddy] Revert "removed unnecessary vals"
5b9cae5 [Ahir Reddy] removed unnecessary vals
07b54d9 [Ahir Reddy] Fix canceling unit test
8ae9681 [Ahir Reddy] Don't interrupt worker
7722342 [Ahir Reddy] Monitor Thread for python workers
db04e16 [Ahir Reddy] Added canceling api to PySpark
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e53eb4f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e53eb4f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e53eb4f0
Branch: refs/heads/master
Commit: e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7
Parents: ee6f7e2
Author: Ahir Reddy <ah...@gmail.com>
Authored: Thu Apr 24 20:21:10 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Apr 24 20:21:10 2014 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkEnv.scala | 8 +++
.../org/apache/spark/api/python/PythonRDD.scala | 30 ++++++++++-
python/pyspark/context.py | 52 ++++++++++++++++++--
3 files changed, 86 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 915315e..bea435e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -97,6 +97,14 @@ class SparkEnv (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
+
+ private[spark]
+ def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers(key).stop()
+ }
+ }
}
object SparkEnv extends Logging {
http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d71fdb..1498b01 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -110,13 +110,41 @@ private[spark] class PythonRDD[T: ClassTag](
}
}.start()
+ // Necessary to distinguish between a task that has failed and a task that is finished
+ @volatile var complete: Boolean = false
+
+ // It is necessary to have a monitor thread for python workers if the user cancels with
+ // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
+ // threads can block indefinitely.
+ new Thread(s"Worker Monitor for $pythonExec") {
+ override def run() {
+ // Kill the worker if it is interrupted or completed
+ // When a python task completes, the context is always set to interupted
+ while (!context.interrupted) {
+ Thread.sleep(2000)
+ }
+ if (!complete) {
+ try {
+ logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.toMap)
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
+ }
+ }
+ }.start()
+
/*
* Partial fix for SPARK-1019: Attempts to stop reading the input stream since
* other completion callbacks might invalidate the input. Because interruption
* is not synchronous this still leaves a potential race where the interruption is
* processed only after the stream becomes invalid.
*/
- context.addOnCompleteCallback(() => context.interrupted = true)
+ context.addOnCompleteCallback{ () =>
+ complete = true // Indicate that the task has completed successfully
+ context.interrupted = true
+ }
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index f63cc4a..c74dc5f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -429,7 +429,7 @@ class SparkContext(object):
storageLevel.deserialized,
storageLevel.replication)
- def setJobGroup(self, groupId, description):
+ def setJobGroup(self, groupId, description, interruptOnCancel=False):
"""
Assigns a group ID to all the jobs started by this thread until the group ID is set to a
different value or cleared.
@@ -437,8 +437,41 @@ class SparkContext(object):
Often, a unit of execution in an application consists of multiple Spark actions or jobs.
Application programmers can use this method to group all those jobs together and give a
group description. Once set, the Spark web UI will associate such jobs with this group.
- """
- self._jsc.setJobGroup(groupId, description)
+
+ The application can use L{SparkContext.cancelJobGroup} to cancel all
+ running jobs in this group.
+
+ >>> import thread, threading
+ >>> from time import sleep
+ >>> result = "Not Set"
+ >>> lock = threading.Lock()
+ >>> def map_func(x):
+ ... sleep(100)
+ ... return x * x
+ >>> def start_job(x):
+ ... global result
+ ... try:
+ ... sc.setJobGroup("job_to_cancel", "some description")
+ ... result = sc.parallelize(range(x)).map(map_func).collect()
+ ... except Exception as e:
+ ... result = "Cancelled"
+ ... lock.release()
+ >>> def stop_job():
+ ... sleep(5)
+ ... sc.cancelJobGroup("job_to_cancel")
+ >>> supress = lock.acquire()
+ >>> supress = thread.start_new_thread(start_job, (10,))
+ >>> supress = thread.start_new_thread(stop_job, tuple())
+ >>> supress = lock.acquire()
+ >>> print result
+ Cancelled
+
+ If interruptOnCancel is set to true for the job group, then job cancellation will result
+ in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+ that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+ where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ """
+ self._jsc.setJobGroup(groupId, description, interruptOnCancel)
def setLocalProperty(self, key, value):
"""
@@ -460,6 +493,19 @@ class SparkContext(object):
"""
return self._jsc.sc().sparkUser()
+ def cancelJobGroup(self, groupId):
+ """
+ Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}
+ for more information.
+ """
+ self._jsc.sc().cancelJobGroup(groupId)
+
+ def cancelAllJobs(self):
+ """
+ Cancel all jobs that have been scheduled or are running.
+ """
+ self._jsc.sc().cancelAllJobs()
+
def _test():
import atexit
import doctest