You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 10:48:31 UTC

[spark] Diff for: [GitHub] liupc closed pull request #23537: [SPARK-26614]Fix speculation kill might cause job failure

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a30a501e5d4a1..f92ea7873f9ce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -606,17 +606,25 @@ private[spark] class Executor(
           if (!ShutdownHookManager.inShutdown()) {
             val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
 
-            val serializedTaskEndReason = {
+            val (state, serializedTaskEndReason) = {
               try {
-                ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
+                if (task.reasonIfKilled.isDefined) {
+                  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
+                  val serializedTK = ser.serialize(TaskKilled(killReason), accUpdates, accums)
+                  (TaskState.KILLED, serializedTK)
+                } else {
+                  (TaskState.FAILED,
+                    ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)))
+                }
               } catch {
                 case _: NotSerializableException =>
                   // t is not serializable so just send the stacktrace
-                  ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+                  (TaskState.FAILED,
+                    ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)))
               }
             }
             setTaskFinishedAndClearInterruptStatus()
-            execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+            execBackend.statusUpdate(taskId, state, serializedTaskEndReason)
           } else {
             logInfo("Not reporting error to driver during JVM shutdown.")
           }


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org