You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 10:48:31 UTC
[spark] Diff for: [GitHub] liupc closed pull request #23537:
[SPARK-26614]Fix speculation kill might cause job failure
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a30a501e5d4a1..f92ea7873f9ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -606,17 +606,25 @@ private[spark] class Executor(
if (!ShutdownHookManager.inShutdown()) {
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
- val serializedTaskEndReason = {
+ val (state, serializedTaskEndReason) = {
try {
- ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
+ if (task.reasonIfKilled.isDefined) {
+ val killReason = task.reasonIfKilled.getOrElse("unknown reason")
+ val serializedTK = ser.serialize(TaskKilled(killReason), accUpdates, accums)
+ (TaskState.KILLED, serializedTK)
+ } else {
+ (TaskState.FAILED,
+ ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)))
+ }
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+ (TaskState.FAILED,
+ ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)))
}
}
setTaskFinishedAndClearInterruptStatus()
- execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+ execBackend.statusUpdate(taskId, state, serializedTaskEndReason)
} else {
logInfo("Not reporting error to driver during JVM shutdown.")
}
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org