You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/22 18:05:50 UTC

spark git commit: [SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport)

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ce0a222f5 -> 94524cef4


[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job (branch-1.6 backport)

This patch is a branch-1.6 backport of #15037:

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`.

## How was this patch tested?

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

Author: Josh Rosen <jo...@databricks.com>

Closes #15186 from JoshRosen/SPARK-17485-branch-1.6-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94524cef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94524cef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94524cef

Branch: refs/heads/branch-1.6
Commit: 94524cef4cf367a0e73ebe0e919cc21f25f1043f
Parents: ce0a222
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Sep 22 11:05:35 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Sep 22 11:05:35 2016 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockFetchException.scala     | 24 --------------------
 .../org/apache/spark/storage/BlockManager.scala |  3 ++-
 .../spark/storage/BlockManagerSuite.scala       |  7 +++---
 3 files changed, 5 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94524cef/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
deleted file mode 100644
index f6e46ae..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import org.apache.spark.SparkException
-
-private[spark]
-case class BlockFetchException(messages: String, throwable: Throwable)
-  extends SparkException(messages, throwable)

http://git-wip-us.apache.org/repos/asf/spark/blob/94524cef/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 339ee144..1fc6f39 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -602,8 +602,9 @@ private[spark] class BlockManager(
           numFetchFailures += 1
           if (numFetchFailures == locations.size) {
             // An exception is thrown while fetching this block from all locations
-            throw new BlockFetchException(s"Failed to fetch block from" +
+            logWarning(s"Failed to fetch block from" +
               s" ${locations.size} locations. Most recent failure cause:", e)
+            return None
           } else {
             // This location failed, so we retry fetch from a different one by returning null here
             logWarning(s"Failed to fetch remote block $blockId " +

http://git-wip-us.apache.org/repos/asf/spark/blob/94524cef/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 47e8545..fc76b91 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -509,10 +509,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       assert(list1Get.isDefined, "list1Get expected to be fetched")
       store3.stop()
       store3 = null
-      // exception throw because there is no locations
-      intercept[BlockFetchException] {
-        list1Get = store.getRemoteBytes("list1")
-      }
+      // Fetch should fail because there are no locations, but no exception should be thrown
+      list1Get = store.getRemoteBytes("list1")
+      assert(list1Get.isEmpty, "list1Get expected to fail")
     } finally {
       origTimeoutOpt match {
         case Some(t) => conf.set("spark.network.timeout", t)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org