You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/23 01:35:47 UTC

spark git commit: [SPARK-10640] History server fails to parse TaskCommitDenied

Repository: spark
Updated Branches:
  refs/heads/master a96ba40f7 -> 61d4c07f4


[SPARK-10640] History server fails to parse TaskCommitDenied

... simply because the code is missing!

Author: Andrew Or <an...@databricks.com>

Closes #8828 from andrewor14/task-end-reason-json.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61d4c07f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61d4c07f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61d4c07f

Branch: refs/heads/master
Commit: 61d4c07f4becb42f054e588be56ed13239644410
Parents: a96ba40
Author: Andrew Or <an...@databricks.com>
Authored: Tue Sep 22 16:35:43 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Sep 22 16:35:43 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala     |  6 +++++-
 .../scala/org/apache/spark/util/JsonProtocol.scala | 13 +++++++++++++
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 17 +++++++++++++++++
 3 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61d4c07f/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7137246..9335c5f 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark
 
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.io.{ObjectInputStream, ObjectOutputStream}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
+// ==============================================================================================
+// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
+// ==============================================================================================
+
 /**
  * :: DeveloperApi ::
  * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry

http://git-wip-us.apache.org/repos/asf/spark/blob/61d4c07f/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 99614a7..40729fa 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,6 +362,10 @@ private[spark] object JsonProtocol {
         ("Stack Trace" -> stackTrace) ~
         ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
         ("Metrics" -> metrics)
+      case taskCommitDenied: TaskCommitDenied =>
+        ("Job ID" -> taskCommitDenied.jobID) ~
+        ("Partition ID" -> taskCommitDenied.partitionID) ~
+        ("Attempt Number" -> taskCommitDenied.attemptNumber)
       case ExecutorLostFailure(executorId, isNormalExit) =>
         ("Executor ID" -> executorId) ~
         ("Normal Exit" -> isNormalExit)
@@ -770,6 +774,7 @@ private[spark] object JsonProtocol {
     val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
     val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
     val taskKilled = Utils.getFormattedClassName(TaskKilled)
+    val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
 
@@ -794,6 +799,14 @@ private[spark] object JsonProtocol {
         ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
+      case `taskCommitDenied` =>
+        // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
+        // de/serialization logic was not added until 1.5.1. To provide backward compatibility
+        // for reading those logs, we need to provide default values for all the fields.
+        val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
+        val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
+        val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
+        TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
         val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
           map(_.extract[Boolean])

http://git-wip-us.apache.org/repos/asf/spark/blob/61d4c07f/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 143c1b9..a24bf29 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,6 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(exceptionFailure)
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
+    testTaskEndReason(TaskCommitDenied(2, 3, 4))
     testTaskEndReason(ExecutorLostFailure("100", true))
     testTaskEndReason(UnknownReason)
 
@@ -352,6 +353,17 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
   }
 
+  // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
+  test("TaskCommitDenied backward compatibility") {
+    val denied = TaskCommitDenied(1, 2, 3)
+    val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
+      .removeField({ _._1 == "Job ID" })
+      .removeField({ _._1 == "Partition ID" })
+      .removeField({ _._1 == "Attempt Number" })
+    val expectedDenied = TaskCommitDenied(-1, -1, -1)
+    assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -577,6 +589,11 @@ class JsonProtocolSuite extends SparkFunSuite {
         assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
       case (TaskResultLost, TaskResultLost) =>
       case (TaskKilled, TaskKilled) =>
+      case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
+          TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
+        assert(jobId1 === jobId2)
+        assert(partitionId1 === partitionId2)
+        assert(attemptNumber1 === attemptNumber2)
       case (ExecutorLostFailure(execId1, isNormalExit1),
           ExecutorLostFailure(execId2, isNormalExit2)) =>
         assert(execId1 === execId2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org