You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/23 02:11:11 UTC

spark git commit: Revert "[SPARK-10640] History server fails to parse TaskCommitDenied"

Repository: spark
Updated Branches:
  refs/heads/branch-1.5 5ffd0841e -> 118ebd405


Revert "[SPARK-10640] History server fails to parse TaskCommitDenied"

This reverts commit 5ffd0841e016301807b0a008af7c3346e9f59e7a.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/118ebd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/118ebd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/118ebd40

Branch: refs/heads/branch-1.5
Commit: 118ebd405a34acedb32e9f3d1cf7b5a835e17dbb
Parents: 5ffd084
Author: Andrew Or <an...@databricks.com>
Authored: Tue Sep 22 17:10:58 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Sep 22 17:10:58 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala  |  6 +-----
 .../org/apache/spark/util/JsonProtocol.scala    | 13 ------------
 .../apache/spark/util/JsonProtocolSuite.scala   | 22 --------------------
 3 files changed, 1 insertion(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index b50354c..7a690df 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,17 +17,13 @@
 
 package org.apache.spark
 
-import java.io.{ObjectInputStream, ObjectOutputStream}
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
-// ==============================================================================================
-// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
-// ==============================================================================================
-
 /**
  * :: DeveloperApi ::
  * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry

http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 311bb59..f742c39 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -364,10 +364,6 @@ private[spark] object JsonProtocol {
         ("Metrics" -> metrics)
       case ExecutorLostFailure(executorId) =>
         ("Executor ID" -> executorId)
-      case taskCommitDenied: TaskCommitDenied =>
-        ("Job ID" -> taskCommitDenied.jobID) ~
-        ("Partition ID" -> taskCommitDenied.partitionID) ~
-        ("Attempt Number" -> taskCommitDenied.attemptNumber)
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -773,7 +769,6 @@ private[spark] object JsonProtocol {
     val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
     val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
     val taskKilled = Utils.getFormattedClassName(TaskKilled)
-    val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
 
@@ -798,14 +793,6 @@ private[spark] object JsonProtocol {
         ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
-      case `taskCommitDenied` =>
-        // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
-        // de/serialization logic was not added until 1.5.1. To provide backward compatibility
-        // for reading those logs, we need to provide default values for all the fields.
-        val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
-        val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
-        val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
-        TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
         val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
         ExecutorLostFailure(executorId.getOrElse("Unknown"))

http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 373c36b..4bf6660 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -152,7 +152,6 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
     testTaskEndReason(ExecutorLostFailure("100"))
-    testTaskEndReason(TaskCommitDenied(2, 3, 4))
     testTaskEndReason(UnknownReason)
 
     // BlockId
@@ -353,17 +352,6 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
   }
 
-  // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
-  test("TaskCommitDenied backward compatibility") {
-    val denied = TaskCommitDenied(1, 2, 3)
-    val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
-      .removeField({ _._1 == "Job ID" })
-      .removeField({ _._1 == "Partition ID" })
-      .removeField({ _._1 == "Attempt Number" })
-    val expectedDenied = TaskCommitDenied(-1, -1, -1)
-    assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
-  }
-
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -589,17 +577,7 @@ class JsonProtocolSuite extends SparkFunSuite {
         assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
       case (TaskResultLost, TaskResultLost) =>
       case (TaskKilled, TaskKilled) =>
-<<<<<<< HEAD
       case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
-=======
-      case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
-          TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
-        assert(jobId1 === jobId2)
-        assert(partitionId1 === partitionId2)
-        assert(attemptNumber1 === attemptNumber2)
-      case (ExecutorLostFailure(execId1, isNormalExit1),
-          ExecutorLostFailure(execId2, isNormalExit2)) =>
->>>>>>> 61d4c07... [SPARK-10640] History server fails to parse TaskCommitDenied
         assert(execId1 === execId2)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org