You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/27 06:12:19 UTC

git commit: Removed throwable field from FetchFailedException and added MetadataFetchFailedException

Repository: spark
Updated Branches:
  refs/heads/master 981bde9b0 -> bf578deaf


Removed throwable field from FetchFailedException and added MetadataFetchFailedException

FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason.

This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead).

Author: Reynold Xin <rx...@apache.org>

Closes #1227 from rxin/metadataFetchException and squashes the following commits:

5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException.
8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf578dea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf578dea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf578dea

Branch: refs/heads/master
Commit: bf578deaf2493081ceeb78dfd7617def5699a06e
Parents: 981bde9
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jun 26 21:12:16 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jun 26 21:12:16 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/FetchFailedException.scala | 45 -----------------
 .../org/apache/spark/MapOutputTracker.scala     | 12 +++--
 .../scala/org/apache/spark/TaskEndReason.scala  |  2 +-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../spark/scheduler/TaskDescription.scala       |  4 ++
 .../spark/shuffle/FetchFailedException.scala    | 52 ++++++++++++++++++++
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |  5 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  1 +
 8 files changed, 69 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
deleted file mode 100644
index 8eaa26b..0000000
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.spark.storage.BlockManagerId
-
-private[spark] class FetchFailedException(
-    taskEndReason: TaskEndReason,
-    message: String,
-    cause: Throwable)
-  extends Exception {
-
-  def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
-      cause: Throwable) =
-    this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
-      "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
-      cause)
-
-  def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
-    this(FetchFailed(null, shuffleId, -1, reduceId),
-      "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
-  override def getMessage(): String = message
-
-
-  override def getCause(): Throwable = cause
-
-  def toTaskEndReason: TaskEndReason = taskEndReason
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ee82d9f..182abac 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -25,9 +25,11 @@ import scala.concurrent.Await
 
 import akka.actor._
 import akka.pattern.ask
+
+import org.apache.spark.util._
 import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util._
 
 private[spark] sealed trait MapOutputTrackerMessage
 private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
           return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
         }
       } else {
-        throw new FetchFailedException(null, shuffleId, -1, reduceId,
-          new Exception("Missing all output locations for shuffle " + shuffleId))
+        throw new MetadataFetchFailedException(
+          shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
       }
     } else {
       statuses.synchronized {
@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
     statuses.map {
       status =>
         if (status == null) {
-          throw new FetchFailedException(null, shuffleId, -1, reduceId,
-            new Exception("Missing an output location for shuffle " + shuffleId))
+          throw new MetadataFetchFailedException(
+            shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
         } else {
           (status.location, decompressSize(status.compressedSizes(reduceId)))
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 5e8bd8c..df42d67 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
  */
 @DeveloperApi
 case class FetchFailed(
-    bmAddress: BlockManagerId,
+    bmAddress: BlockManagerId,  // Note that bmAddress can be null
     shuffleId: Int,
     mapId: Int,
     reduceId: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 557b9a3..4d3ba11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util.{AkkaUtils, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 1481d70..4c96b9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.util.SerializableBuffer
 
+/**
+ * Description of a task that gets passed onto executors to be executed, usually created by
+ * [[TaskSetManager.resourceOffer]].
+ */
 private[spark] class TaskDescription(
     val taskId: Long,
     val executorId: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
new file mode 100644
index 0000000..71c08e9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.{FetchFailed, TaskEndReason}
+
+/**
+ * Failed to fetch a shuffle block. The executor catches this exception and propagates it
+ * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
+ *
+ * Note that bmAddress can be null.
+ */
+private[spark] class FetchFailedException(
+    bmAddress: BlockManagerId,
+    shuffleId: Int,
+    mapId: Int,
+    reduceId: Int)
+  extends Exception {
+
+  override def getMessage: String =
+    "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+
+  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+}
+
+/**
+ * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
+ */
+private[spark] class MetadataFetchFailedException(
+    shuffleId: Int,
+    reduceId: Int,
+    message: String)
+  extends FetchFailedException(null, shuffleId, -1, reduceId) {
+
+  override def getMessage: String = message
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index b05b6ea..a932455 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
+import org.apache.spark._
 import org.apache.spark.executor.ShuffleReadMetrics
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
 import org.apache.spark.util.CompletionIterator
-import org.apache.spark._
 
 private[hash] object BlockStoreShuffleFetcher extends Logging {
   def fetch[T](
@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
           blockId match {
             case ShuffleBlockId(shufId, mapId, _) =>
               val address = statuses(mapId.toInt)._1
-              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
+              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
             case _ =>
               throw new SparkException(
                 "Failed to get block " + blockId + ", which is not a shuffle block")

http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 95ba273..9702838 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
 import org.scalatest.FunSuite
 
 import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils