You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/01/05 22:52:05 UTC

spark git commit: [SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason

Repository: spark
Updated Branches:
  refs/heads/master 30345c43b -> f5d18af6a


[SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason

## What changes were proposed in this pull request?

TaskResultGetter tries to deserialize the TaskEndReason before handling the failed task. If an error is thrown during deserialization, the failed task won't be handled, which leaves the job hanging.
The PR proposes to handle the failed task in a finally block.
## How was this patch tested?

In my case I hit a NoClassDefFoundError and the job hangs. Manually verified the patch can fix it.

Author: Rui Li <ru...@intel.com>
Author: Rui Li <li...@apache.org>
Author: Rui Li <sh...@cn.ibm.com>

Closes #12775 from lirui-intel/SPARK-14958.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5d18af6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5d18af6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5d18af6

Branch: refs/heads/master
Commit: f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6
Parents: 30345c4
Author: Rui Li <ru...@intel.com>
Authored: Thu Jan 5 14:51:13 2017 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Jan 5 14:51:13 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/TaskResultGetter.scala      |  6 +++++-
 .../spark/scheduler/TaskResultGetterSuite.scala | 21 +++++++++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5d18af6/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index b1addc1..a284f79 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
               logError(
                 "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
             case ex: Exception => // No-op
+          } finally {
+            // If there's an error while deserializing the TaskEndReason, this Runnable
+            // will die. Still tell the scheduler about the task failure, to avoid a hang
+            // where the scheduler thinks the task is still running.
+            scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
           }
-          scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
         }
       })
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/f5d18af6/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index c9e682f..3e55d39 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.File
+import java.io.{File, ObjectInputStream}
 import java.net.URL
 import java.nio.ByteBuffer
 
@@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
     assert(resSizeAfter.exists(_.toString.toLong > 0L))
   }
 
+  test("failed task is handled when error occurs deserializing the reason") {
+    sc = new SparkContext("local", "test", conf)
+    val rdd = sc.parallelize(Seq(1), 1).map { _ =>
+      throw new UndeserializableException
+    }
+    val message = intercept[SparkException] {
+      rdd.collect()
+    }.getMessage
+    // Job failed, even though the failure reason is unknown.
+    val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
+    assert(unknownFailure.findFirstMatchIn(message).isDefined)
+  }
+
+}
+
+private class UndeserializableException extends Exception {
+  private def readObject(in: ObjectInputStream): Unit = {
+    throw new NoClassDefFoundError()
+  }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org