You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/04 01:38:36 UTC

[spark] branch branch-3.1 updated: [SPARK-34939][CORE] Throw fetch failure exception when unable to deserialize broadcasted map statuses

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7375da2  [SPARK-34939][CORE] Throw fetch failure exception when unable to deserialize broadcasted map statuses
7375da2 is described below

commit 7375da245ae62b37f32419672995c2e4c2c904ee
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sat Apr 3 18:37:50 2021 -0700

    [SPARK-34939][CORE] Throw fetch failure exception when unable to deserialize broadcasted map statuses
    
    ### What changes were proposed in this pull request?
    
    This patch catches `IOException`, which is possibly thrown due to unable to deserialize map statuses (e.g., broadcasted value is destroyed), when deserilizing map statuses. Once `IOException` is caught, `MetadataFetchFailedException` is thrown to let Spark handle it.
    
    ### Why are the changes needed?
    
    One customer encountered application error. From the log, it is caused by accessing non-existing broadcasted value. The broadcasted value is map statuses. E.g.,
    
    ```
    [info]   Cause: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
    [info]   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1410)
    [info]   at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226)
    [info]   at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103)
    [info]   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    [info]   at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$3(MapOutputTracker.scala:967)
    [info]   at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    [info]   at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
    [info]   at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:887)
    [info]   at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:967)
    ```
    
    There is a race-condition. After map statuses are broadcasted and the executors obtain serialized broadcasted map statuses. If any fetch failure happens after, Spark scheduler invalidates cached map statuses and destroy broadcasted value of the map statuses. Then any executor trying to deserialize serialized broadcasted map statuses and access broadcasted value, `IOException` will be thrown. Currently we don't catch it in `MapOutputTrackerWorker` and above exception will fail the appl [...]
    
    Normally we should throw a fetch failure exception for such case. Spark scheduler will handle this.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #32033 from viirya/fix-broadcast-master.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 571acc87fef6ddf8a6046bf710d5065dc02d76bd)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 33 +++++++++++------
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 41 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index cdec198..ce71c2c 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
+import java.io.{ByteArrayInputStream, IOException, ObjectInputStream, ObjectOutputStream}
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
@@ -100,7 +100,7 @@ private class ShuffleStatus(numPartitions: Int) extends Logging {
    * broadcast variable in order to keep it from being garbage collected and to allow for it to be
    * explicitly destroyed later on when the ShuffleMapStage is garbage-collected.
    */
-  private[this] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
+  private[spark] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _
 
   /**
    * Counter tracking the number of partitions that have output. This is a performance optimization
@@ -843,7 +843,14 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
         if (fetchedStatuses == null) {
           logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
+          try {
+            fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
+          } catch {
+            case e: SparkException =>
+              throw new MetadataFetchFailedException(shuffleId, -1,
+                s"Unable to deserialize broadcasted map statuses for shuffle $shuffleId: " +
+                  e.getCause)
+          }
           logInfo("Got the output locations")
           mapStatuses.put(shuffleId, fetchedStatuses)
         }
@@ -953,13 +960,19 @@ private[spark] object MapOutputTracker extends Logging {
       case DIRECT =>
         deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]]
       case BROADCAST =>
-        // deserialize the Broadcast, pull .value array out of it, and then deserialize that
-        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
-          asInstanceOf[Broadcast[Array[Byte]]]
-        logInfo("Broadcast mapstatuses size = " + bytes.length +
-          ", actual size = " + bcast.value.length)
-        // Important - ignore the DIRECT tag ! Start from offset 1
-        deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
+        try {
+          // deserialize the Broadcast, pull .value array out of it, and then deserialize that
+          val bcast = deserializeObject(bytes, 1, bytes.length - 1).
+            asInstanceOf[Broadcast[Array[Byte]]]
+          logInfo("Broadcast mapstatuses size = " + bytes.length +
+            ", actual size = " + bcast.value.length)
+          // Important - ignore the DIRECT tag ! Start from offset 1
+          deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]]
+        } catch {
+          case e: IOException =>
+            logWarning("Exception encountered during deserializing broadcasted map statuses: ", e)
+            throw new SparkException("Unable to deserialize broadcasted map statuses", e)
+        }
       case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0))
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index b5b68f6..33e1113 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -333,4 +333,45 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     rpcEnv.shutdown()
   }
 
+  test("SPARK-34939: remote fetch using broadcast if broadcasted value is destroyed") {
+    val newConf = new SparkConf
+    newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
+    newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
+    newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 1MiB framesize
+
+    // needs TorrentBroadcast so need a SparkContext
+    withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc =>
+      val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
+      val rpcEnv = sc.env.rpcEnv
+      val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
+      rpcEnv.stop(masterTracker.trackerEndpoint)
+      rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
+
+      masterTracker.registerShuffle(20, 100)
+      (0 until 100).foreach { i =>
+        masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
+          BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0), 5))
+      }
+
+      val mapWorkerRpcEnv = createRpcEnv("spark-worker", "localhost", 0, new SecurityManager(conf))
+      val mapWorkerTracker = new MapOutputTrackerWorker(conf)
+      mapWorkerTracker.trackerEndpoint =
+        mapWorkerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+
+      val fetchedBytes = mapWorkerTracker.trackerEndpoint
+        .askSync[Array[Byte]](GetMapOutputStatuses(20))
+      assert(fetchedBytes(0) == 1)
+
+      // Normally `unregisterMapOutput` triggers the destroy of broadcasted value.
+      // But the timing of destroying broadcasted value is indeterminate, we manually destroy
+      // it by blocking.
+      masterTracker.shuffleStatuses.get(20).foreach { shuffleStatus =>
+        shuffleStatus.cachedSerializedBroadcast.destroy(true)
+      }
+      val err = intercept[SparkException] {
+        MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
+      }
+      assert(err.getMessage.contains("Unable to deserialize broadcasted map statuses"))
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org