You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/27 06:12:19 UTC
git commit: Removed throwable field from FetchFailedException and
added MetadataFetchFailedException
Repository: spark
Updated Branches:
refs/heads/master 981bde9b0 -> bf578deaf
Removed throwable field from FetchFailedException and added MetadataFetchFailedException
FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason.
This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead).
Author: Reynold Xin <rx...@apache.org>
Closes #1227 from rxin/metadataFetchException and squashes the following commits:
5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException.
8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf578dea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf578dea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf578dea
Branch: refs/heads/master
Commit: bf578deaf2493081ceeb78dfd7617def5699a06e
Parents: 981bde9
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jun 26 21:12:16 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jun 26 21:12:16 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/FetchFailedException.scala | 45 -----------------
.../org/apache/spark/MapOutputTracker.scala | 12 +++--
.../scala/org/apache/spark/TaskEndReason.scala | 2 +-
.../org/apache/spark/executor/Executor.scala | 2 +-
.../spark/scheduler/TaskDescription.scala | 4 ++
.../spark/shuffle/FetchFailedException.scala | 52 ++++++++++++++++++++
.../shuffle/hash/BlockStoreShuffleFetcher.scala | 5 +-
.../apache/spark/MapOutputTrackerSuite.scala | 1 +
8 files changed, 69 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/FetchFailedException.scala
deleted file mode 100644
index 8eaa26b..0000000
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.spark.storage.BlockManagerId
-
-private[spark] class FetchFailedException(
- taskEndReason: TaskEndReason,
- message: String,
- cause: Throwable)
- extends Exception {
-
- def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
- cause: Throwable) =
- this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
- "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
- cause)
-
- def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
- this(FetchFailed(null, shuffleId, -1, reduceId),
- "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
- override def getMessage(): String = message
-
-
- override def getCause(): Throwable = cause
-
- def toTaskEndReason: TaskEndReason = taskEndReason
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ee82d9f..182abac 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -25,9 +25,11 @@ import scala.concurrent.Await
import akka.actor._
import akka.pattern.ask
+
+import org.apache.spark.util._
import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util._
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing all output locations for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
statuses.map {
status =>
if (status == null) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
+ throw new MetadataFetchFailedException(
+ shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 5e8bd8c..df42d67 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
- bmAddress: BlockManagerId,
+ bmAddress: BlockManagerId, // Note that bmAddress can be null
shuffleId: Int,
mapId: Int,
reduceId: Int)
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 557b9a3..4d3ba11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 1481d70..4c96b9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
+/**
+ * Description of a task that gets passed onto executors to be executed, usually created by
+ * [[TaskSetManager.resourceOffer]].
+ */
private[spark] class TaskDescription(
val taskId: Long,
val executorId: String,
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
new file mode 100644
index 0000000..71c08e9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.{FetchFailed, TaskEndReason}
+
+/**
+ * Failed to fetch a shuffle block. The executor catches this exception and propagates it
+ * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
+ *
+ * Note that bmAddress can be null.
+ */
+private[spark] class FetchFailedException(
+ bmAddress: BlockManagerId,
+ shuffleId: Int,
+ mapId: Int,
+ reduceId: Int)
+ extends Exception {
+
+ override def getMessage: String =
+ "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+
+ def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+}
+
+/**
+ * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
+ */
+private[spark] class MetadataFetchFailedException(
+ shuffleId: Int,
+ reduceId: Int,
+ message: String)
+ extends FetchFailedException(null, shuffleId, -1, reduceId) {
+
+ override def getMessage: String = message
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index b05b6ea..a932455 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
+import org.apache.spark._
import org.apache.spark.executor.ShuffleReadMetrics
import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
-import org.apache.spark._
private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
blockId match {
case ShuffleBlockId(shufId, mapId, _) =>
val address = statuses(mapId.toInt)._1
- throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
+ throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
http://git-wip-us.apache.org/repos/asf/spark/blob/bf578dea/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 95ba273..9702838 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
import org.scalatest.FunSuite
import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils