You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/07/08 00:39:55 UTC

[spark] branch branch-3.0 updated: [SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9237fb2  [SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version
9237fb2 is described below

commit 9237fb2ca90d16208634d70cdff1a0ea9ddcce26
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Wed Jul 8 09:36:06 2020 +0900

    [SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version
    
    ### What changes were proposed in this pull request?
    Use the invalid value Int.MinValue to fill the map index when the event logs from the old Spark version.
    
    ### Why are the changes needed?
    Follow up PR for #28941.
    
    ### Does this PR introduce _any_ user-facing change?
    When we use the Spark version 3.0 history server reading the event log written by the old Spark version, we use the invalid value -2 to fill the map index.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #28965 from xuanyuanking/follow-up.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    (cherry picked from commit 365961155a655f19c9184b16ccd493838c848213)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
 core/src/main/scala/org/apache/spark/TaskEndReason.scala          | 3 ++-
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala      | 8 ++++++--
 core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +-
 3 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index b13028f..6606d31 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -90,7 +90,8 @@ case class FetchFailed(
   extends TaskFailedReason {
   override def toErrorString: String = {
     val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
-    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndex, " +
+    val mapIndexString = if (mapIndex == Int.MinValue) "Unknown" else mapIndex.toString
+    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndexString, " +
       s"mapId=$mapId, reduceId=$reduceId, message=\n$message\n)"
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 74ff5c7..78fbd0c 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -993,8 +993,12 @@ private[spark] object JsonProtocol {
         val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
         val shuffleId = (json \ "Shuffle ID").extract[Int]
         val mapId = (json \ "Map ID").extract[Long]
-        val mapIndex = (json \ "Map Index") match {
-          case JNothing => 0
+        val mapIndex = json \ "Map Index" match {
+          case JNothing =>
+            // Note, we use the invalid value Int.MinValue here to fill the map index for backward
+            // compatibility. Otherwise, the fetch failed event will be dropped when the history
+            // server loads the event log written by the Spark version before 3.0.
+            Int.MinValue
           case x => x.extract[Int]
         }
         val reduceId = (json \ "Reduce ID").extract[Int]
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 98aaa9e..b77cd81 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -312,7 +312,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
       .removeField({ _._1 == "Map Index" })
     val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
-      0, 19, "ignored")
+      Int.MinValue, 19, "ignored")
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org