You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/11/16 22:41:09 UTC

spark git commit: [SPARK-22535][PYSPARK] Sleep before killing the python worker in PythRunner.MonitorThread (branch-2.2)

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 0b51fd3eb -> be68f86e1


[SPARK-22535][PYSPARK] Sleep before killing the python worker in PythRunner.MonitorThread (branch-2.2)

## What changes were proposed in this pull request?

Backport #19762 to 2.2

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zs...@gmail.com>

Closes #19768 from zsxwing/SPARK-22535-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be68f86e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be68f86e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be68f86e

Branch: refs/heads/branch-2.2
Commit: be68f86e11d64209d9e325ce807025318f383bea
Parents: 0b51fd3
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu Nov 16 14:41:05 2017 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu Nov 16 14:41:05 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be68f86e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 807b51f..63ae705 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -360,6 +360,9 @@ private[spark] class PythonRunner(
   class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
     extends Thread(s"Worker Monitor for $pythonExec") {
 
+    /** How long to wait before killing the python worker if a task cannot be interrupted. */
+    private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
+
     setDaemon(true)
 
     override def run() {
@@ -369,12 +372,18 @@ private[spark] class PythonRunner(
         Thread.sleep(2000)
       }
       if (!context.isCompleted) {
-        try {
-          logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
-          env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
-        } catch {
-          case e: Exception =>
-            logError("Exception when trying to kill worker", e)
+        Thread.sleep(taskKillTimeout)
+        if (!context.isCompleted) {
+          try {
+            // Mimic the task name used in `Executor` to help the user find out the task to blame.
+            val taskName = s"${context.partitionId}.${context.taskAttemptId} " +
+              s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+            logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker")
+            env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+          } catch {
+            case e: Exception =>
+              logError("Exception when trying to kill worker", e)
+          }
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org