You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/23 15:23:40 UTC

spark git commit: [SPARK-20904][CORE] Don't report task failures to driver during shutdown.

Repository: spark
Updated Branches:
  refs/heads/master ccaee5b54 -> cecd285a2


[SPARK-20904][CORE] Don't report task failures to driver during shutdown.

Executors run a thread pool with daemon threads to run tasks. This means
that those threads remain active when the JVM is shutting down, meaning
those tasks are affected by code that runs in shutdown hooks.

So if a shutdown hook messes with something that the task is using (e.g.
an HDFS connection), the task will fail and will report that failure to
the driver. That will make the driver mark the task as failed regardless
of what caused the executor to shut down. So, for example, if YARN pre-empted
that executor, the driver would consider that task failed when it should
instead ignore the failure.

This change avoids reporting failures to the driver when shutdown hooks
are executing; this fixes the YARN preemption accounting, and doesn't really
change things much for other scenarios, other than reporting a more generic
error ("Executor lost") when the executor shuts down unexpectedly - which
is arguably more correct.

Tested with a hacky app running on spark-shell that tried to cause failures
only when shutdown hooks were running, verified that preemption didn't cause
the app to fail because of task failures exceeding the threshold.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18594 from vanzin/SPARK-20904.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cecd285a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cecd285a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cecd285a

Branch: refs/heads/master
Commit: cecd285a2aabad4e7db5a3d18944b87fbc4eee6c
Parents: ccaee5b
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Sun Jul 23 23:23:13 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Jul 23 23:23:13 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    | 47 ++++++++++++--------
 1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cecd285a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 21f0db1..690b5a2 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -473,29 +473,38 @@ private[spark] class Executor(
           // the default uncaught exception handler, which will terminate the Executor.
           logError(s"Exception in $taskName (TID $taskId)", t)
 
-          // Collect latest accumulator values to report back to the driver
-          val accums: Seq[AccumulatorV2[_, _]] =
-            if (task != null) {
-              task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
-              task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
-              task.collectAccumulatorUpdates(taskFailed = true)
-            } else {
-              Seq.empty
-            }
+          // SPARK-20904: Do not report failure to driver if if happened during shut down. Because
+          // libraries may set up shutdown hooks that race with running tasks during shutdown,
+          // spurious failures may occur and can result in improper accounting in the driver (e.g.
+          // the task failure would not be ignored if the shutdown happened because of premption,
+          // instead of an app issue).
+          if (!ShutdownHookManager.inShutdown()) {
+            // Collect latest accumulator values to report back to the driver
+            val accums: Seq[AccumulatorV2[_, _]] =
+              if (task != null) {
+                task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+                task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+                task.collectAccumulatorUpdates(taskFailed = true)
+              } else {
+                Seq.empty
+              }
 
-          val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
+            val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
 
-          val serializedTaskEndReason = {
-            try {
-              ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
-            } catch {
-              case _: NotSerializableException =>
-                // t is not serializable so just send the stacktrace
-                ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+            val serializedTaskEndReason = {
+              try {
+                ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
+              } catch {
+                case _: NotSerializableException =>
+                  // t is not serializable so just send the stacktrace
+                  ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
+              }
             }
+            setTaskFinishedAndClearInterruptStatus()
+            execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+          } else {
+            logInfo("Not reporting error to driver during JVM shutdown.")
           }
-          setTaskFinishedAndClearInterruptStatus()
-          execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
 
           // Don't forcibly exit unless the exception was inherently fatal, to avoid
           // stopping other tasks unnecessarily.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org