You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/23 02:11:11 UTC
spark git commit: Revert "[SPARK-10640] History server fails to parse
TaskCommitDenied"
Repository: spark
Updated Branches:
refs/heads/branch-1.5 5ffd0841e -> 118ebd405
Revert "[SPARK-10640] History server fails to parse TaskCommitDenied"
This reverts commit 5ffd0841e016301807b0a008af7c3346e9f59e7a.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/118ebd40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/118ebd40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/118ebd40
Branch: refs/heads/branch-1.5
Commit: 118ebd405a34acedb32e9f3d1cf7b5a835e17dbb
Parents: 5ffd084
Author: Andrew Or <an...@databricks.com>
Authored: Tue Sep 22 17:10:58 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Sep 22 17:10:58 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/TaskEndReason.scala | 6 +-----
.../org/apache/spark/util/JsonProtocol.scala | 13 ------------
.../apache/spark/util/JsonProtocolSuite.scala | 22 --------------------
3 files changed, 1 insertion(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index b50354c..7a690df 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,17 +17,13 @@
package org.apache.spark
-import java.io.{ObjectInputStream, ObjectOutputStream}
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
-// ==============================================================================================
-// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
-// ==============================================================================================
-
/**
* :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 311bb59..f742c39 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -364,10 +364,6 @@ private[spark] object JsonProtocol {
("Metrics" -> metrics)
case ExecutorLostFailure(executorId) =>
("Executor ID" -> executorId)
- case taskCommitDenied: TaskCommitDenied =>
- ("Job ID" -> taskCommitDenied.jobID) ~
- ("Partition ID" -> taskCommitDenied.partitionID) ~
- ("Attempt Number" -> taskCommitDenied.attemptNumber)
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
@@ -773,7 +769,6 @@ private[spark] object JsonProtocol {
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
- val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)
@@ -798,14 +793,6 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
- case `taskCommitDenied` =>
- // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
- // de/serialization logic was not added until 1.5.1. To provide backward compatibility
- // for reading those logs, we need to provide default values for all the fields.
- val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
- val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
- val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
- TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
ExecutorLostFailure(executorId.getOrElse("Unknown"))
http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 373c36b..4bf6660 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -152,7 +152,6 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
testTaskEndReason(ExecutorLostFailure("100"))
- testTaskEndReason(TaskCommitDenied(2, 3, 4))
testTaskEndReason(UnknownReason)
// BlockId
@@ -353,17 +352,6 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
}
- // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
- test("TaskCommitDenied backward compatibility") {
- val denied = TaskCommitDenied(1, 2, 3)
- val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
- .removeField({ _._1 == "Job ID" })
- .removeField({ _._1 == "Partition ID" })
- .removeField({ _._1 == "Attempt Number" })
- val expectedDenied = TaskCommitDenied(-1, -1, -1)
- assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
- }
-
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
@@ -589,17 +577,7 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
-<<<<<<< HEAD
case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
-=======
- case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
- TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
- assert(jobId1 === jobId2)
- assert(partitionId1 === partitionId2)
- assert(attemptNumber1 === attemptNumber2)
- case (ExecutorLostFailure(execId1, isNormalExit1),
- ExecutorLostFailure(execId2, isNormalExit2)) =>
->>>>>>> 61d4c07... [SPARK-10640] History server fails to parse TaskCommitDenied
assert(execId1 === execId2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org