You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/11/16 22:41:09 UTC
spark git commit: [SPARK-22535][PYSPARK] Sleep before killing the
python worker in PythRunner.MonitorThread (branch-2.2)
Repository: spark
Updated Branches:
refs/heads/branch-2.2 0b51fd3eb -> be68f86e1
[SPARK-22535][PYSPARK] Sleep before killing the python worker in PythRunner.MonitorThread (branch-2.2)
## What changes were proposed in this pull request?
Backport #19762 to 2.2
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zs...@gmail.com>
Closes #19768 from zsxwing/SPARK-22535-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be68f86e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be68f86e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be68f86e
Branch: refs/heads/branch-2.2
Commit: be68f86e11d64209d9e325ce807025318f383bea
Parents: 0b51fd3
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Thu Nov 16 14:41:05 2017 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Thu Nov 16 14:41:05 2017 -0800
----------------------------------------------------------------------
.../org/apache/spark/api/python/PythonRDD.scala | 21 ++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/be68f86e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 807b51f..63ae705 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -360,6 +360,9 @@ private[spark] class PythonRunner(
class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext)
extends Thread(s"Worker Monitor for $pythonExec") {
+ /** How long to wait before killing the python worker if a task cannot be interrupted. */
+ private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s")
+
setDaemon(true)
override def run() {
@@ -369,12 +372,18 @@ private[spark] class PythonRunner(
Thread.sleep(2000)
}
if (!context.isCompleted) {
- try {
- logWarning("Incomplete task interrupted: Attempting to kill Python Worker")
- env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
- } catch {
- case e: Exception =>
- logError("Exception when trying to kill worker", e)
+ Thread.sleep(taskKillTimeout)
+ if (!context.isCompleted) {
+ try {
+ // Mimic the task name used in `Executor` to help the user find out the task to blame.
+ val taskName = s"${context.partitionId}.${context.taskAttemptId} " +
+ s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+ logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org