You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/29 04:10:52 UTC

[spark] branch branch-3.0 updated: [SPARK-32124][CORE][SHS] Fix taskEndReasonFromJson to handle event logs from old Spark versions

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fb8574a  [SPARK-32124][CORE][SHS] Fix taskEndReasonFromJson to handle event logs from old Spark versions
fb8574a is described below

commit fb8574a468be62cccd8b3598134d96bb4f217abb
Author: Warren Zhu <zh...@microsoft.com>
AuthorDate: Sun Jun 28 21:06:45 2020 -0700

    [SPARK-32124][CORE][SHS] Fix taskEndReasonFromJson to handle event logs from old Spark versions
    
    ### What changes were proposed in this pull request?
    Fix bug of exception when parse event log of fetch failed task end reason without `Map Index`
    
    ### Why are the changes needed?
    When Spark history server read event log produced by older version of spark 2.4 (which don't have `Map Index` field), parsing of TaskEndReason will fail. This will cause TaskEnd event being ignored.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    JsonProtocolSuite.test("FetchFailed Map Index backwards compatibility")
    
    Closes #28941 from warrenzhu25/shs-task.
    
    Authored-by: Warren Zhu <zh...@microsoft.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 197ac3b13239d50a1f34a5860940e353ca6b99d5)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala  |  5 ++++-
 .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala  | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index d53ca0f..74ff5c7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -993,7 +993,10 @@ private[spark] object JsonProtocol {
         val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
         val shuffleId = (json \ "Shuffle ID").extract[Int]
         val mapId = (json \ "Map ID").extract[Long]
-        val mapIndex = (json \ "Map Index").extract[Int]
+        val mapIndex = (json \ "Map Index") match {
+          case JNothing => 0
+          case x => x.extract[Int]
+        }
         val reduceId = (json \ "Reduce ID").extract[Int]
         val message = jsonOption(json \ "Message").map(_.extract[String])
         new FetchFailed(blockManagerAddress, shuffleId, mapId, mapIndex, reduceId,
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 5f1c753..98aaa9e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -305,6 +305,17 @@ class JsonProtocolSuite extends SparkFunSuite {
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
+  test("SPARK-32124: FetchFailed Map Index backwards compatibility") {
+    // FetchFailed in Spark 2.4.0 does not have "Map Index" property.
+    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, 18, 19,
+      "ignored")
+    val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
+      .removeField({ _._1 == "Map Index" })
+    val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
+      0, 19, "ignored")
+    assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
+  }
+
   test("ShuffleReadMetrics: Local bytes read backwards compatibility") {
     // Metrics about local shuffle bytes read were added in 1.3.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org