You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/12 22:44:07 UTC

spark git commit: [SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job

Repository: spark
Updated Branches:
  refs/heads/master 7c51b99a4 -> f9c580f11


[SPARK-17485] Prevent failed remote reads of cached blocks from failing entire job

## What changes were proposed in this pull request?

In Spark's `RDD.getOrCompute` we first try to read a local copy of a cached RDD block, then a remote copy, and only fall back to recomputing the block if no cached copy (local or remote) can be read. This logic works correctly in the case where no remote copies of the block exist, but if there _are_ remote copies and reads of those copies fail (due to network issues or internal Spark bugs) then the BlockManager will throw a `BlockFetchException` that will fail the task (and which could possibly fail the whole job if the read failures keep occurring).

In the cases of TorrentBroadcast and task result fetching we really do want to fail the entire job in case no remote blocks can be fetched, but this logic is inappropriate for reads of cached RDD blocks because those can/should be recomputed in case cached blocks are unavailable.

Therefore, I think that the `BlockManager.getRemoteBytes()` method should never throw on remote fetch errors and, instead, should handle failures by returning `None`.

## How was this patch tested?

Block manager changes should be covered by modified tests in `BlockManagerSuite`: the old tests expected exceptions to be thrown on failed remote reads, while the modified tests now expect `None` to be returned from the `getRemote*` method.

I also manually inspected all usages of `BlockManager.getRemoteValues()`, `getRemoteBytes()`, and `get()` to verify that they correctly pattern-match on the result and handle `None`. Note that these `None` branches are already exercised because the old `getRemoteBytes` returned `None` when no remote locations for the block could be found (which could occur if an executor died and its block manager de-registered with the master).

Author: Josh Rosen <jo...@databricks.com>

Closes #15037 from JoshRosen/SPARK-17485.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9c580f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9c580f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9c580f1

Branch: refs/heads/master
Commit: f9c580f11098d95f098936a0b90fa21d71021205
Parents: 7c51b99
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Sep 12 15:43:57 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Sep 12 15:43:57 2016 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockFetchException.scala     | 24 --------------------
 .../org/apache/spark/storage/BlockManager.scala |  5 ++--
 .../spark/storage/BlockManagerSuite.scala       | 10 +++-----
 3 files changed, 6 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9c580f1/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
deleted file mode 100644
index f6e46ae..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import org.apache.spark.SparkException
-
-private[spark]
-case class BlockFetchException(messages: String, throwable: Throwable)
-  extends SparkException(messages, throwable)

http://git-wip-us.apache.org/repos/asf/spark/blob/f9c580f1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9e63777..a724fdf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -559,8 +559,9 @@ private[spark] class BlockManager(
             // Give up trying anymore locations. Either we've tried all of the original locations,
             // or we've refreshed the list of locations from the master, and have still
             // hit failures after trying locations from the refreshed list.
-            throw new BlockFetchException(s"Failed to fetch block after" +
-              s" ${totalFailureCount} fetch failures. Most recent failure cause:", e)
+            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
+              s"Most recent failure cause:", e)
+            return None
           }
 
           logWarning(s"Failed to fetch remote block $blockId " +

http://git-wip-us.apache.org/repos/asf/spark/blob/f9c580f1/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 87c8628..fdf28b7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -513,10 +513,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
     store3.stop()
     store3 = null
-    // exception throw because there is no locations
-    intercept[BlockFetchException] {
-      store.getRemoteBytes("list1")
-    }
+    // Should return None instead of throwing an exception:
+    assert(store.getRemoteBytes("list1").isEmpty)
   }
 
   test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
@@ -1186,9 +1184,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))
     store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
     store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true)
-    intercept[BlockFetchException] {
-      store.getRemoteBytes("item")
-    }
+    assert(store.getRemoteBytes("item").isEmpty)
   }
 
   test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org