You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/04/25 05:21:15 UTC

git commit: [SPARK-986]: Job cancelation for PySpark

Repository: spark
Updated Branches:
  refs/heads/master ee6f7e22a -> e53eb4f01


[SPARK-986]: Job cancelation for PySpark

* Additions to the PySpark API to cancel jobs
* Monitor Thread in PythonRDD to kill Python workers if a task is interrupted

Author: Ahir Reddy <ah...@gmail.com>

Closes #541 from ahirreddy/python-cancel and squashes the following commits:

dfdf447 [Ahir Reddy] Changed success -> completed and made logging message clearer
6c860ab [Ahir Reddy] PR Comments
4b4100a [Ahir Reddy] Success flag
adba6ed [Ahir Reddy] Destroy python workers
27a2f8f [Ahir Reddy] Start the writer thread...
d422f7b [Ahir Reddy] Remove unnecesssary vals
adda337 [Ahir Reddy] Busy wait on the ocntext.interrupted flag, and then kill the python worker
d9e472f [Ahir Reddy] Revert "removed unnecessary vals"
5b9cae5 [Ahir Reddy] removed unnecessary vals
07b54d9 [Ahir Reddy] Fix canceling unit test
8ae9681 [Ahir Reddy] Don't interrupt worker
7722342 [Ahir Reddy] Monitor Thread for python workers
db04e16 [Ahir Reddy] Added canceling api to PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e53eb4f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e53eb4f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e53eb4f0

Branch: refs/heads/master
Commit: e53eb4f0159ebd4d72c4bbc3586fdfc66ccacab7
Parents: ee6f7e2
Author: Ahir Reddy <ah...@gmail.com>
Authored: Thu Apr 24 20:21:10 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Apr 24 20:21:10 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |  8 +++
 .../org/apache/spark/api/python/PythonRDD.scala | 30 ++++++++++-
 python/pyspark/context.py                       | 52 ++++++++++++++++++--
 3 files changed, 86 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 915315e..bea435e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -97,6 +97,14 @@ class SparkEnv (
       pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
     }
   }
+
+  private[spark]
+  def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
+    synchronized {
+      val key = (pythonExec, envVars)
+      pythonWorkers(key).stop()
+    }
+  }
 }
 
 object SparkEnv extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d71fdb..1498b01 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -110,13 +110,41 @@ private[spark] class PythonRDD[T: ClassTag](
       }
     }.start()
 
+    // Necessary to distinguish between a task that has failed and a task that is finished
+    @volatile var complete: Boolean = false
+
+    // It is necessary to have a monitor thread for python workers if the user cancels with
+    // interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
+    // threads can block indefinitely.
+    new Thread(s"Worker Monitor for $pythonExec") {
+      override def run() {
+        // Kill the worker if it is interrupted or completed
+        // When a python task completes, the context is always set to interupted
+        while (!context.interrupted) {
+          Thread.sleep(2000)
+        }
+        if (!complete) {
+          try {
+            logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
+            env.destroyPythonWorker(pythonExec, envVars.toMap)
+          } catch {
+            case e: Exception =>
+              logError("Exception when trying to kill worker", e)
+          }
+        }
+      }
+    }.start()
+
     /*
      * Partial fix for SPARK-1019: Attempts to stop reading the input stream since
      * other completion callbacks might invalidate the input. Because interruption
      * is not synchronous this still leaves a potential race where the interruption is
      * processed only after the stream becomes invalid.
      */
-    context.addOnCompleteCallback(() => context.interrupted = true)
+    context.addOnCompleteCallback{ () =>
+      complete = true // Indicate that the task has completed successfully
+      context.interrupted = true
+    }
 
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/e53eb4f0/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index f63cc4a..c74dc5f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -429,7 +429,7 @@ class SparkContext(object):
                                storageLevel.deserialized,
                                storageLevel.replication)
 
-    def setJobGroup(self, groupId, description):
+    def setJobGroup(self, groupId, description, interruptOnCancel=False):
         """
         Assigns a group ID to all the jobs started by this thread until the group ID is set to a
         different value or cleared.
@@ -437,8 +437,41 @@ class SparkContext(object):
         Often, a unit of execution in an application consists of multiple Spark actions or jobs.
         Application programmers can use this method to group all those jobs together and give a
         group description. Once set, the Spark web UI will associate such jobs with this group.
-        """
-        self._jsc.setJobGroup(groupId, description)
+
+        The application can use L{SparkContext.cancelJobGroup} to cancel all
+        running jobs in this group.
+
+        >>> import thread, threading
+        >>> from time import sleep
+        >>> result = "Not Set"
+        >>> lock = threading.Lock()
+        >>> def map_func(x):
+        ...     sleep(100)
+        ...     return x * x
+        >>> def start_job(x):
+        ...     global result
+        ...     try:
+        ...         sc.setJobGroup("job_to_cancel", "some description")
+        ...         result = sc.parallelize(range(x)).map(map_func).collect()
+        ...     except Exception as e:
+        ...         result = "Cancelled"
+        ...     lock.release()
+        >>> def stop_job():
+        ...     sleep(5)
+        ...     sc.cancelJobGroup("job_to_cancel")
+        >>> supress = lock.acquire()
+        >>> supress = thread.start_new_thread(start_job, (10,))
+        >>> supress = thread.start_new_thread(stop_job, tuple())
+        >>> supress = lock.acquire()
+        >>> print result
+        Cancelled
+
+        If interruptOnCancel is set to true for the job group, then job cancellation will result
+        in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
+        that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
+        where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+        """
+        self._jsc.setJobGroup(groupId, description, interruptOnCancel)
 
     def setLocalProperty(self, key, value):
         """
@@ -460,6 +493,19 @@ class SparkContext(object):
         """
         return self._jsc.sc().sparkUser()
 
+    def cancelJobGroup(self, groupId):
+        """
+        Cancel active jobs for the specified group. See L{SparkContext.setJobGroup}
+        for more information.
+        """
+        self._jsc.sc().cancelJobGroup(groupId)
+
+    def cancelAllJobs(self):
+        """
+        Cancel all jobs that have been scheduled or are running.
+        """
+        self._jsc.sc().cancelAllJobs()
+
 def _test():
     import atexit
     import doctest