You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Ash (JIRA)" <ji...@apache.org> on 2017/08/11 01:24:00 UTC

[jira] [Commented] (SPARK-21564) TaskDescription decoding failure should fail the task

    [ https://issues.apache.org/jira/browse/SPARK-21564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16122651#comment-16122651 ] 

Andrew Ash commented on SPARK-21564:
------------------------------------

[~irashid] a possible fix could look roughly like this:

{noformat}
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa22b0..06d72fe106 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -17,6 +17,7 @@

 package org.apache.spark.executor

+import java.io.{DataInputStream, NotSerializableException}
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
@@ -35,7 +36,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{ByteBufferInputStream, ThreadUtils, Utils}

 private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
@@ -93,9 +94,26 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
-        val taskDesc = TaskDescription.decode(data.value)
-        logInfo("Got assigned task " + taskDesc.taskId)
-        executor.launchTask(this, taskDesc)
+        try {
+          val taskDesc = TaskDescription.decode(data.value)
+          logInfo("Got assigned task " + taskDesc.taskId)
+          executor.launchTask(this, taskDesc)
+        } catch {
+          case e: Exception =>
+            val taskId = new DataInputStream(new ByteBufferInputStream(
+              ByteBuffer.wrap(data.value.array()))).readLong()
+            val ser = env.closureSerializer.newInstance()
+            val serializedTaskEndReason = {
+              try {
+                ser.serialize(new ExceptionFailure(e, Nil))
+              } catch {
+                case _: NotSerializableException =>
+                  // e is not serializable so just send the stacktrace
+                  ser.serialize(new ExceptionFailure(e, Nil, false))
+              }
+            }
+            statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
+        }
       }

     case KillTask(taskId, _, interruptThread, reason) =>
{noformat}

The downside here though is that we're still making the assumption that the TaskDescription is well-formatted enough to be able to get the taskId out of it (the first long in the serialized bytes).

Any other thoughts on how to do this?

> TaskDescription decoding failure should fail the task
> -----------------------------------------------------
>
>                 Key: SPARK-21564
>                 URL: https://issues.apache.org/jira/browse/SPARK-21564
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Andrew Ash
>
> cc [~robert3005]
> I was seeing an issue where Spark was throwing this exception:
> {noformat}
> 16:16:28.294 [dispatcher-event-loop-14] ERROR org.apache.spark.rpc.netty.Inbox - Ignoring error
> java.io.EOFException: null
>     at java.io.DataInputStream.readFully(DataInputStream.java:197)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:609)
>     at java.io.DataInputStream.readUTF(DataInputStream.java:564)
>     at org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:127)
>     at org.apache.spark.scheduler.TaskDescription$$anonfun$decode$1.apply(TaskDescription.scala:126)
>     at scala.collection.immutable.Range.foreach(Range.scala:160)
>     at org.apache.spark.scheduler.TaskDescription$.decode(TaskDescription.scala:126)
>     at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:95)
>     at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
>     at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
>     at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
>     at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For details on the cause of that exception, see SPARK-21563
> We've since changed the application and have a proposed fix in Spark at the ticket above, but it was troubling that decoding the TaskDescription wasn't failing the tasks.  So the Spark job ended up hanging and making no progress, permanently stuck, because the driver thinks the task is running but the thread has died in the executor.
> We should make a change around https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L96 so that when that decode throws an exception, the task is marked as failed.
> cc [~kayousterhout] [~irashid]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org