You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2017/07/18 03:40:35 UTC

spark git commit: [SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker

Repository: spark
Updated Branches:
  refs/heads/master e9faae135 -> 5952ad2b4


[SPARK-21444] Be more defensive when removing broadcasts in MapOutputTracker

## What changes were proposed in this pull request?

In SPARK-21444, sitalkedia reported an issue where the `Broadcast.destroy()` call in `MapOutputTracker`'s `ShuffleStatus.invalidateSerializedMapOutputStatusCache()` was failing with an `IOException`, causing the DAGScheduler to crash and bring down the entire driver.

This is a bug introduced by #17955. In the old code, we removed a broadcast variable by calling `BroadcastManager.unbroadcast` with `blocking=false`, but the new code simply calls `Broadcast.destroy()` which is capable of failing with an IOException in case certain blocking RPCs time out.

The fix implemented here is to replace this with a call to `destroy(blocking = false)` and to wrap the entire operation in `Utils.tryLogNonFatalError`.

## How was this patch tested?

I haven't written regression tests for this because it's really hard to inject mocks to simulate RPC failures here. Instead, this class of issue is probably best uncovered with more generalized error injection / network unreliability / fuzz testing tools.

Author: Josh Rosen <jo...@databricks.com>

Closes #18662 from JoshRosen/SPARK-21444.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5952ad2b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5952ad2b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5952ad2b

Branch: refs/heads/master
Commit: 5952ad2b40c82c0ccb2ec16fa09071bf198ff99d
Parents: e9faae1
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Jul 17 20:40:32 2017 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Jul 17 20:40:32 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5952ad2b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 5d48bc7..7f760a5 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -194,7 +194,12 @@ private class ShuffleStatus(numPartitions: Int) {
    */
   def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {
     if (cachedSerializedBroadcast != null) {
-      cachedSerializedBroadcast.destroy()
+      // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)
+      Utils.tryLogNonFatalError {
+        // Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
+        // RPCs to dead executors.
+        cachedSerializedBroadcast.destroy(blocking = false)
+      }
       cachedSerializedBroadcast = null
     }
     cachedSerializedMapStatus = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org