You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/08 21:30:40 UTC

[2/2] spark git commit: [SPARK-6627] Finished rename to ShuffleBlockResolver

[SPARK-6627] Finished rename to ShuffleBlockResolver

The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager
to ShuffleBlockResolver, but didn't rename the associated subclasses and
variables; this commit does that.

I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class?

cc pwendell

Author: Kay Ousterhout <ka...@gmail.com>

Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits:

43add1e [Kay Ousterhout] Spacing fix
96080bf [Kay Ousterhout] Test fixes
d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b3bb0e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b3bb0e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b3bb0e4

Branch: refs/heads/master
Commit: 4b3bb0e43ca7e1a27308516608419487b6a844e6
Parents: 2d05f32
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri May 8 12:24:06 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri May 8 12:24:06 2015 -0700

----------------------------------------------------------------------
 .../spark/shuffle/FileShuffleBlockManager.scala | 300 -------------------
 .../shuffle/FileShuffleBlockResolver.scala      | 297 ++++++++++++++++++
 .../shuffle/IndexShuffleBlockManager.scala      | 124 --------
 .../shuffle/IndexShuffleBlockResolver.scala     | 122 ++++++++
 .../spark/shuffle/hash/HashShuffleManager.scala |   6 +-
 .../spark/shuffle/hash/HashShuffleWriter.scala  |   4 +-
 .../spark/shuffle/sort/SortShuffleManager.scala |   6 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  12 +-
 .../org/apache/spark/storage/BlockId.scala      |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |   5 +-
 .../apache/spark/storage/DiskBlockManager.scala |   2 +-
 .../shuffle/hash/HashShuffleManagerSuite.scala  |  18 +-
 .../shuffle/ExternalShuffleBlockHandler.java    |   6 +-
 .../shuffle/ExternalShuffleBlockManager.java    | 254 ----------------
 .../shuffle/ExternalShuffleBlockResolver.java   | 254 ++++++++++++++++
 .../ExternalShuffleBlockHandlerSuite.java       |  16 +-
 .../ExternalShuffleBlockManagerSuite.java       | 129 --------
 .../ExternalShuffleBlockResolverSuite.java      | 129 ++++++++
 .../shuffle/ExternalShuffleCleanupSuite.java    |  37 +--
 .../network/shuffle/TestShuffleDataContext.java |   8 +-
 20 files changed, 865 insertions(+), 866 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
deleted file mode 100644
index e9b4e2b..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.io.File
-import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConversions._
-
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
-import org.apache.spark.storage._
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
-
-/** A group of writers for a ShuffleMapTask, one writer per reducer. */
-private[spark] trait ShuffleWriterGroup {
-  val writers: Array[BlockObjectWriter]
-
-  /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
-  def releaseWriters(success: Boolean)
-}
-
-/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer (this set of files is called a ShuffleFileGroup).
- *
- * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
- * files, it releases them for another task.
- * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
- *   - shuffleId: The unique id given to the entire shuffle stage.
- *   - bucketId: The id of the output partition (i.e., reducer id)
- *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
- *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
- * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
- * that specifies where in a given file the actual block data is located.
- *
- * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
- * each block stored in each file. In order to find the location of a shuffle block, we search the
- * files within a ShuffleFileGroups associated with the block's reducer.
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
-private[spark]
-class FileShuffleBlockManager(conf: SparkConf)
-  extends ShuffleBlockResolver with Logging {
-
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
-
-  private lazy val blockManager = SparkEnv.get.blockManager
-
-  // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
-  // TODO: Remove this once the shuffle file consolidation feature is stable.
-  private val consolidateShuffleFiles =
-    conf.getBoolean("spark.shuffle.consolidateFiles", false)
-
-  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided 
-  private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
-
-  /**
-   * Contains all the state related to a particular shuffle. This includes a pool of unused
-   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
-   */
-  private class ShuffleState(val numBuckets: Int) {
-    val nextFileId = new AtomicInteger(0)
-    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
-    /**
-     * The mapIds of all map tasks completed on this Executor for this shuffle.
-     * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
-     */
-    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
-  }
-
-  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
-
-  private val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
-
-  /**
-   * Get a ShuffleWriterGroup for the given map task, which will register it as complete
-   * when the writers are closed successfully
-   */
-  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
-      writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
-    new ShuffleWriterGroup {
-      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
-      private val shuffleState = shuffleStates(shuffleId)
-      private var fileGroup: ShuffleFileGroup = null
-
-      val openStartTime = System.nanoTime
-      val serializerInstance = serializer.newInstance()
-      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
-        fileGroup = getUnusedFileGroup()
-        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
-            writeMetrics)
-        }
-      } else {
-        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          val blockFile = blockManager.diskBlockManager.getFile(blockId)
-          // Because of previous failures, the shuffle file may already exist on this machine.
-          // If so, remove it.
-          if (blockFile.exists) {
-            if (blockFile.delete()) {
-              logInfo(s"Removed existing shuffle file $blockFile")
-            } else {
-              logWarning(s"Failed to remove existing shuffle file $blockFile")
-            }
-          }
-          blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
-            writeMetrics)
-        }
-      }
-      // Creating the file to write to and creating a disk writer both involve interacting with
-      // the disk, so should be included in the shuffle write time.
-      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
-
-      override def releaseWriters(success: Boolean) {
-        if (consolidateShuffleFiles) {
-          if (success) {
-            val offsets = writers.map(_.fileSegment().offset)
-            val lengths = writers.map(_.fileSegment().length)
-            fileGroup.recordMapOutput(mapId, offsets, lengths)
-          }
-          recycleFileGroup(fileGroup)
-        } else {
-          shuffleState.completedMapTasks.add(mapId)
-        }
-      }
-
-      private def getUnusedFileGroup(): ShuffleFileGroup = {
-        val fileGroup = shuffleState.unusedFileGroups.poll()
-        if (fileGroup != null) fileGroup else newFileGroup()
-      }
-
-      private def newFileGroup(): ShuffleFileGroup = {
-        val fileId = shuffleState.nextFileId.getAndIncrement()
-        val files = Array.tabulate[File](numBuckets) { bucketId =>
-          val filename = physicalFileName(shuffleId, bucketId, fileId)
-          blockManager.diskBlockManager.getFile(filename)
-        }
-        val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
-        shuffleState.allFileGroups.add(fileGroup)
-        fileGroup
-      }
-
-      private def recycleFileGroup(group: ShuffleFileGroup) {
-        shuffleState.unusedFileGroups.add(group)
-      }
-    }
-  }
-
-  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
-    if (consolidateShuffleFiles) {
-      // Search all file groups associated with this shuffle.
-      val shuffleState = shuffleStates(blockId.shuffleId)
-      val iter = shuffleState.allFileGroups.iterator
-      while (iter.hasNext) {
-        val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
-        if (segmentOpt.isDefined) {
-          val segment = segmentOpt.get
-          return new FileSegmentManagedBuffer(
-            transportConf, segment.file, segment.offset, segment.length)
-        }
-      }
-      throw new IllegalStateException("Failed to find shuffle block: " + blockId)
-    } else {
-      val file = blockManager.diskBlockManager.getFile(blockId)
-      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
-    }
-  }
-
-  /** Remove all the blocks / files and metadata related to a particular shuffle. */
-  def removeShuffle(shuffleId: ShuffleId): Boolean = {
-    // Do not change the ordering of this, if shuffleStates should be removed only
-    // after the corresponding shuffle blocks have been removed
-    val cleaned = removeShuffleBlocks(shuffleId)
-    shuffleStates.remove(shuffleId)
-    cleaned
-  }
-
-  /** Remove all the blocks / files related to a particular shuffle. */
-  private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
-    shuffleStates.get(shuffleId) match {
-      case Some(state) =>
-        if (consolidateShuffleFiles) {
-          for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
-            file.delete()
-          }
-        } else {
-          for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
-            val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
-            blockManager.diskBlockManager.getFile(blockId).delete()
-          }
-        }
-        logInfo("Deleted all files for shuffle " + shuffleId)
-        true
-      case None =>
-        logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
-        false
-    }
-  }
-
-  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
-    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
-  }
-
-  private def cleanup(cleanupTime: Long) {
-    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
-  }
-
-  override def stop() {
-    metadataCleaner.cancel()
-  }
-}
-
-private[spark]
-object FileShuffleBlockManager {
-  /**
-   * A group of shuffle files, one per reducer.
-   * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
-   */
-  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
-    private var numBlocks: Int = 0
-
-    /**
-     * Stores the absolute index of each mapId in the files of this group. For instance,
-     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
-     */
-    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
-    /**
-     * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
-     * position in the file.
-     * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
-     * reducer.
-     */
-    private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
-      new PrimitiveVector[Long]()
-    }
-
-    def apply(bucketId: Int): File = files(bucketId)
-
-    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
-      assert(offsets.length == lengths.length)
-      mapIdToIndex(mapId) = numBlocks
-      numBlocks += 1
-      for (i <- 0 until offsets.length) {
-        blockOffsetsByReducer(i) += offsets(i)
-        blockLengthsByReducer(i) += lengths(i)
-      }
-    }
-
-    /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
-    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
-      val file = files(reducerId)
-      val blockOffsets = blockOffsetsByReducer(reducerId)
-      val blockLengths = blockLengthsByReducer(reducerId)
-      val index = mapIdToIndex.getOrElse(mapId, -1)
-      if (index >= 0) {
-        val offset = blockOffsets(index)
-        val length = blockLengths(index)
-        Some(new FileSegment(file, offset, length))
-      } else {
-        None
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
new file mode 100644
index 0000000..6ad427b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
+import org.apache.spark.storage._
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+
+/** A group of writers for a ShuffleMapTask, one writer per reducer. */
+private[spark] trait ShuffleWriterGroup {
+  val writers: Array[BlockObjectWriter]
+
+  /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
+  def releaseWriters(success: Boolean)
+}
+
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
+ * per reducer (this set of files is called a ShuffleFileGroup).
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
+ * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
+ * files, it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
+ *   - shuffleId: The unique id given to the entire shuffle stage.
+ *   - bucketId: The id of the output partition (i.e., reducer id)
+ *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
+ *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
+ * that specifies where in a given file the actual block data is located.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
+ * each block stored in each file. In order to find the location of a shuffle block, we search the
+ * files within a ShuffleFileGroups associated with the block's reducer.
+ */
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
+private[spark] class FileShuffleBlockResolver(conf: SparkConf)
+  extends ShuffleBlockResolver with Logging {
+
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
+  private lazy val blockManager = SparkEnv.get.blockManager
+
+  // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
+  // TODO: Remove this once the shuffle file consolidation feature is stable.
+  private val consolidateShuffleFiles =
+    conf.getBoolean("spark.shuffle.consolidateFiles", false)
+
+  // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided 
+  private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
+
+  /**
+   * Contains all the state related to a particular shuffle. This includes a pool of unused
+   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
+   */
+  private class ShuffleState(val numBuckets: Int) {
+    val nextFileId = new AtomicInteger(0)
+    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+    /**
+     * The mapIds of all map tasks completed on this Executor for this shuffle.
+     * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
+     */
+    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
+  }
+
+  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
+
+  private val metadataCleaner =
+    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
+
+  /**
+   * Get a ShuffleWriterGroup for the given map task, which will register it as complete
+   * when the writers are closed successfully
+   */
+  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
+      writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
+    new ShuffleWriterGroup {
+      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
+      private val shuffleState = shuffleStates(shuffleId)
+      private var fileGroup: ShuffleFileGroup = null
+
+      val openStartTime = System.nanoTime
+      val serializerInstance = serializer.newInstance()
+      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
+        fileGroup = getUnusedFileGroup()
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
+            writeMetrics)
+        }
+      } else {
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          // Because of previous failures, the shuffle file may already exist on this machine.
+          // If so, remove it.
+          if (blockFile.exists) {
+            if (blockFile.delete()) {
+              logInfo(s"Removed existing shuffle file $blockFile")
+            } else {
+              logWarning(s"Failed to remove existing shuffle file $blockFile")
+            }
+          }
+          blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
+            writeMetrics)
+        }
+      }
+      // Creating the file to write to and creating a disk writer both involve interacting with
+      // the disk, so should be included in the shuffle write time.
+      writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
+
+      override def releaseWriters(success: Boolean) {
+        if (consolidateShuffleFiles) {
+          if (success) {
+            val offsets = writers.map(_.fileSegment().offset)
+            val lengths = writers.map(_.fileSegment().length)
+            fileGroup.recordMapOutput(mapId, offsets, lengths)
+          }
+          recycleFileGroup(fileGroup)
+        } else {
+          shuffleState.completedMapTasks.add(mapId)
+        }
+      }
+
+      private def getUnusedFileGroup(): ShuffleFileGroup = {
+        val fileGroup = shuffleState.unusedFileGroups.poll()
+        if (fileGroup != null) fileGroup else newFileGroup()
+      }
+
+      private def newFileGroup(): ShuffleFileGroup = {
+        val fileId = shuffleState.nextFileId.getAndIncrement()
+        val files = Array.tabulate[File](numBuckets) { bucketId =>
+          val filename = physicalFileName(shuffleId, bucketId, fileId)
+          blockManager.diskBlockManager.getFile(filename)
+        }
+        val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
+        shuffleState.allFileGroups.add(fileGroup)
+        fileGroup
+      }
+
+      private def recycleFileGroup(group: ShuffleFileGroup) {
+        shuffleState.unusedFileGroups.add(group)
+      }
+    }
+  }
+
+  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
+    if (consolidateShuffleFiles) {
+      // Search all file groups associated with this shuffle.
+      val shuffleState = shuffleStates(blockId.shuffleId)
+      val iter = shuffleState.allFileGroups.iterator
+      while (iter.hasNext) {
+        val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
+        if (segmentOpt.isDefined) {
+          val segment = segmentOpt.get
+          return new FileSegmentManagedBuffer(
+            transportConf, segment.file, segment.offset, segment.length)
+        }
+      }
+      throw new IllegalStateException("Failed to find shuffle block: " + blockId)
+    } else {
+      val file = blockManager.diskBlockManager.getFile(blockId)
+      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
+    }
+  }
+
+  /** Remove all the blocks / files and metadata related to a particular shuffle. */
+  def removeShuffle(shuffleId: ShuffleId): Boolean = {
+    // Do not change the ordering of this, if shuffleStates should be removed only
+    // after the corresponding shuffle blocks have been removed
+    val cleaned = removeShuffleBlocks(shuffleId)
+    shuffleStates.remove(shuffleId)
+    cleaned
+  }
+
+  /** Remove all the blocks / files related to a particular shuffle. */
+  private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
+    shuffleStates.get(shuffleId) match {
+      case Some(state) =>
+        if (consolidateShuffleFiles) {
+          for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+            file.delete()
+          }
+        } else {
+          for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
+            val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+            blockManager.diskBlockManager.getFile(blockId).delete()
+          }
+        }
+        logInfo("Deleted all files for shuffle " + shuffleId)
+        true
+      case None =>
+        logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
+        false
+    }
+  }
+
+  private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
+    "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
+  }
+
+  private def cleanup(cleanupTime: Long) {
+    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
+  }
+
+  override def stop() {
+    metadataCleaner.cancel()
+  }
+}
+
+private[spark] object FileShuffleBlockResolver {
+  /**
+   * A group of shuffle files, one per reducer.
+   * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
+   */
+  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+    private var numBlocks: Int = 0
+
+    /**
+     * Stores the absolute index of each mapId in the files of this group. For instance,
+     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
+     */
+    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
+
+    /**
+     * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+     * position in the file.
+     * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
+     * reducer.
+     */
+    private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
+    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+      new PrimitiveVector[Long]()
+    }
+
+    def apply(bucketId: Int): File = files(bucketId)
+
+    def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+      assert(offsets.length == lengths.length)
+      mapIdToIndex(mapId) = numBlocks
+      numBlocks += 1
+      for (i <- 0 until offsets.length) {
+        blockOffsetsByReducer(i) += offsets(i)
+        blockLengthsByReducer(i) += lengths(i)
+      }
+    }
+
+    /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
+    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
+      val file = files(reducerId)
+      val blockOffsets = blockOffsetsByReducer(reducerId)
+      val blockLengths = blockLengthsByReducer(reducerId)
+      val index = mapIdToIndex.getOrElse(mapId, -1)
+      if (index >= 0) {
+        val offset = blockOffsets(index)
+        val length = blockLengths(index)
+        Some(new FileSegment(file, offset, length))
+      } else {
+        None
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
deleted file mode 100644
index a1741e2..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.io._
-import java.nio.ByteBuffer
-
-import com.google.common.io.ByteStreams
-
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.storage._
-import org.apache.spark.util.Utils
-
-import IndexShuffleBlockManager.NOOP_REDUCE_ID
-
-/**
- * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
- * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
- * The offsets of the data blocks in the data file are stored in a separate index file.
- *
- * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
- * as the filename postfix for data file, and ".index" as the filename postfix for index file.
- *
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
-private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
-
-  private lazy val blockManager = SparkEnv.get.blockManager
-
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
-
-  def getDataFile(shuffleId: Int, mapId: Int): File = {
-    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
-  }
-
-  private def getIndexFile(shuffleId: Int, mapId: Int): File = {
-    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
-  }
-
-  /**
-   * Remove data file and index file that contain the output data from one map.
-   * */
-  def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
-    var file = getDataFile(shuffleId, mapId)
-    if (file.exists()) {
-      file.delete()
-    }
-
-    file = getIndexFile(shuffleId, mapId)
-    if (file.exists()) {
-      file.delete()
-    }
-  }
-
-  /**
-   * Write an index file with the offsets of each block, plus a final offset at the end for the
-   * end of the output file. This will be used by getBlockLocation to figure out where each block
-   * begins and ends.
-   * */
-  def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
-    val indexFile = getIndexFile(shuffleId, mapId)
-    val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
-    Utils.tryWithSafeFinally {
-      // We take in lengths of each block, need to convert it to offsets.
-      var offset = 0L
-      out.writeLong(offset)
-      for (length <- lengths) {
-        offset += length
-        out.writeLong(offset)
-      }
-    } {
-      out.close()
-    }
-  }
-
-  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
-    // The block is actually going to be a range of a single map output file for this map, so
-    // find out the consolidated file, then the offset within that from our index
-    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
-
-    val in = new DataInputStream(new FileInputStream(indexFile))
-    try {
-      ByteStreams.skipFully(in, blockId.reduceId * 8)
-      val offset = in.readLong()
-      val nextOffset = in.readLong()
-      new FileSegmentManagedBuffer(
-        transportConf,
-        getDataFile(blockId.shuffleId, blockId.mapId),
-        offset,
-        nextOffset - offset)
-    } finally {
-      in.close()
-    }
-  }
-
-  override def stop(): Unit = {}
-}
-
-private[spark] object IndexShuffleBlockManager {
-  // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
-  // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
-  // shuffle outputs for several reduces are glommed into a single file.
-  // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
-  val NOOP_REDUCE_ID = 0
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
new file mode 100644
index 0000000..d9c63b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io._
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+import IndexShuffleBlockResolver.NOOP_REDUCE_ID
+
+/**
+ * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
+ * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
+ * The offsets of the data blocks in the data file are stored in a separate index file.
+ *
+ * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
+ * as the filename postfix for data file, and ".index" as the filename postfix for index file.
+ *
+ */
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
+
+  private lazy val blockManager = SparkEnv.get.blockManager
+
+  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
+  def getDataFile(shuffleId: Int, mapId: Int): File = {
+    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
+  }
+
+  private def getIndexFile(shuffleId: Int, mapId: Int): File = {
+    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
+  }
+
+  /**
+   * Remove data file and index file that contain the output data from one map.
+   * */
+  def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
+    var file = getDataFile(shuffleId, mapId)
+    if (file.exists()) {
+      file.delete()
+    }
+
+    file = getIndexFile(shuffleId, mapId)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+
+  /**
+   * Write an index file with the offsets of each block, plus a final offset at the end for the
+   * end of the output file. This will be used by getBlockLocation to figure out where each block
+   * begins and ends.
+   * */
+  def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
+    val indexFile = getIndexFile(shuffleId, mapId)
+    val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
+    Utils.tryWithSafeFinally {
+      // We take in lengths of each block, need to convert it to offsets.
+      var offset = 0L
+      out.writeLong(offset)
+      for (length <- lengths) {
+        offset += length
+        out.writeLong(offset)
+      }
+    } {
+      out.close()
+    }
+  }
+
+  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
+    // The block is actually going to be a range of a single map output file for this map, so
+    // find out the consolidated file, then the offset within that from our index
+    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
+
+    val in = new DataInputStream(new FileInputStream(indexFile))
+    try {
+      ByteStreams.skipFully(in, blockId.reduceId * 8)
+      val offset = in.readLong()
+      val nextOffset = in.readLong()
+      new FileSegmentManagedBuffer(
+        transportConf,
+        getDataFile(blockId.shuffleId, blockId.mapId),
+        offset,
+        nextOffset - offset)
+    } finally {
+      in.close()
+    }
+  }
+
+  override def stop(): Unit = {}
+}
+
+private[spark] object IndexShuffleBlockResolver {
+  // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
+  // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
+  // shuffle outputs for several reduces are glommed into a single file.
+  // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
+  val NOOP_REDUCE_ID = 0
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 2a7df8d..c089088 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
  */
 private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
 
-  private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+  private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
 
   /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
   override def registerShuffle[K, V, C](
@@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
     shuffleBlockResolver.removeShuffle(shuffleId)
   }
 
-  override def shuffleBlockResolver: FileShuffleBlockManager = {
-    fileShuffleBlockManager
+  override def shuffleBlockResolver: FileShuffleBlockResolver = {
+    fileShuffleBlockResolver
   }
 
   /** Shut down this ShuffleManager. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index cd27c9e..897f0a5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
 import org.apache.spark.storage.BlockObjectWriter
 
 private[spark] class HashShuffleWriter[K, V](
-    shuffleBlockManager: FileShuffleBlockManager,
+    shuffleBlockResolver: FileShuffleBlockResolver,
     handle: BaseShuffleHandle[K, V, _],
     mapId: Int,
     context: TaskContext)
@@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](
 
   private val blockManager = SparkEnv.get.blockManager
   private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
-  private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
+  private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
     writeMetrics)
 
   /** Write a bunch of records to this task's output */

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 0497036..1584294 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
 
 private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
 
-  private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
+  private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
   private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
 
   /**
@@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
     true
   }
 
-  override def shuffleBlockResolver: IndexShuffleBlockManager = {
-    indexShuffleBlockManager
+  override def shuffleBlockResolver: IndexShuffleBlockResolver = {
+    indexShuffleBlockResolver
   }
 
   /** Shut down this ShuffleManager. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index a066435..add2656 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
 import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
 
 private[spark] class SortShuffleWriter[K, V, C](
-    shuffleBlockManager: IndexShuffleBlockManager,
+    shuffleBlockResolver: IndexShuffleBlockResolver,
     handle: BaseShuffleHandle[K, V, C],
     mapId: Int,
     context: TaskContext)
@@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
     // Don't bother including the time to open the merged output file in the shuffle write time,
     // because it just opens a single file, so is typically too fast to measure accurately
     // (see SPARK-3570).
-    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
-    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
+    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
     val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
-    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
+    shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
 
     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
   }
@@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
         return Option(mapStatus)
       } else {
         // The map task failed, so delete our output data.
-        shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
+        shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
         return None
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c186fd3..524f697 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
 }
 
 // Format of the shuffle block ids (including data and index) should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
 @DeveloperApi
 case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
   override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a46fecd..cc794e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -431,10 +431,11 @@ private[spark] class BlockManager(
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
     if (blockId.isShuffle) {
-      val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+      val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
       // TODO: This should gracefully handle case where local block is not available. Currently
       // downstream code will throw an exception.
-      Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+      Option(
+        shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
     } else {
       doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5764c16..2a44477 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
 
   /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
-  // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
+  // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
   def getFile(filename: String): File = {
     // Figure out which local directory it hashes to, and which subdirectory in that
     val hash = Utils.nonNegativeHash(filename)

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 84384bb..0537bf6 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.shuffle.FileShuffleBlockResolver
 import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
 
 class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
@@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
 
     sc = new SparkContext("local", "test", conf)
 
-    val shuffleBlockManager =
-      SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
+    val shuffleBlockResolver =
+      SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
 
-    val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
+    val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
       new ShuffleWriteMetrics)
     for (writer <- shuffle1.writers) {
       writer.write("test1", "value")
@@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
     val shuffle1Segment = shuffle1.writers(0).fileSegment()
     shuffle1.releaseWriters(success = true)
 
-    val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
+    val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
       new ShuffleWriteMetrics)
 
     for (writer <- shuffle2.writers) {
@@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
     // of block based on remaining data in file : which could mess things up when there is
     // concurrent read and writes happening to the same shuffle group.
 
-    val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
+    val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
       new ShuffleWriteMetrics)
     for (writer <- shuffle3.writers) {
       writer.write("test3", "value")
@@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
       writer.commitAndClose()
     }
     // check before we register.
-    checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
+    checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
     shuffle3.releaseWriters(success = true)
-    checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
-    shuffleBlockManager.removeShuffle(1)
+    checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
+    shuffleBlockResolver.removeShuffle(1)
   }
 
   def writeToFile(file: File, numBytes: Int) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 46ca970..e4faaf8 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -46,18 +46,18 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
 public class ExternalShuffleBlockHandler extends RpcHandler {
   private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
 
-  private final ExternalShuffleBlockManager blockManager;
+  private final ExternalShuffleBlockResolver blockManager;
   private final OneForOneStreamManager streamManager;
 
   public ExternalShuffleBlockHandler(TransportConf conf) {
-    this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
+    this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
   }
 
   /** Enables mocking out the StreamManager and BlockManager. */
   @VisibleForTesting
   ExternalShuffleBlockHandler(
       OneForOneStreamManager streamManager,
-      ExternalShuffleBlockManager blockManager) {
+      ExternalShuffleBlockResolver blockManager) {
     this.streamManager = streamManager;
     this.blockManager = blockManager;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
deleted file mode 100644
index 93e6fdd..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
- * of Executors. Each Executor must register its own configuration about where it stores its files
- * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
- *
- * Executors with shuffle file consolidation are not currently supported, as the index is stored in
- * the Executor's memory, unlike the IndexShuffleBlockManager.
- */
-public class ExternalShuffleBlockManager {
-  private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
-
-  // Map containing all registered executors' metadata.
-  private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
-
-  // Single-threaded Java executor used to perform expensive recursive directory deletion.
-  private final Executor directoryCleaner;
-
-  private final TransportConf conf;
-
-  public ExternalShuffleBlockManager(TransportConf conf) {
-    this(conf, Executors.newSingleThreadExecutor(
-        // Add `spark` prefix because it will run in NM in Yarn mode.
-        NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
-  }
-
-  // Allows tests to have more control over when directories are cleaned up.
-  @VisibleForTesting
-  ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
-    this.conf = conf;
-    this.executors = Maps.newConcurrentMap();
-    this.directoryCleaner = directoryCleaner;
-  }
-
-  /** Registers a new Executor with all the configuration we need to find its shuffle files. */
-  public void registerExecutor(
-      String appId,
-      String execId,
-      ExecutorShuffleInfo executorInfo) {
-    AppExecId fullId = new AppExecId(appId, execId);
-    logger.info("Registered executor {} with {}", fullId, executorInfo);
-    executors.put(fullId, executorInfo);
-  }
-
-  /**
-   * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
-   * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
-   * assumptions about how the hash and sort based shuffles store their data.
-   */
-  public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
-    String[] blockIdParts = blockId.split("_");
-    if (blockIdParts.length < 4) {
-      throw new IllegalArgumentException("Unexpected block id format: " + blockId);
-    } else if (!blockIdParts[0].equals("shuffle")) {
-      throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
-    }
-    int shuffleId = Integer.parseInt(blockIdParts[1]);
-    int mapId = Integer.parseInt(blockIdParts[2]);
-    int reduceId = Integer.parseInt(blockIdParts[3]);
-
-    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
-    if (executor == null) {
-      throw new RuntimeException(
-        String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
-    }
-
-    if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
-      return getHashBasedShuffleBlockData(executor, blockId);
-    } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) {
-      return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
-    } else {
-      throw new UnsupportedOperationException(
-        "Unsupported shuffle manager: " + executor.shuffleManager);
-    }
-  }
-
-  /**
-   * Removes our metadata of all executors registered for the given application, and optionally
-   * also deletes the local directories associated with the executors of that application in a
-   * separate thread.
-   *
-   * It is not valid to call registerExecutor() for an executor with this appId after invoking
-   * this method.
-   */
-  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
-    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
-    Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
-    while (it.hasNext()) {
-      Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
-      AppExecId fullId = entry.getKey();
-      final ExecutorShuffleInfo executor = entry.getValue();
-
-      // Only touch executors associated with the appId that was removed.
-      if (appId.equals(fullId.appId)) {
-        it.remove();
-
-        if (cleanupLocalDirs) {
-          logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
-
-          // Execute the actual deletion in a different thread, as it may take some time.
-          directoryCleaner.execute(new Runnable() {
-            @Override
-            public void run() {
-              deleteExecutorDirs(executor.localDirs);
-            }
-          });
-        }
-      }
-    }
-  }
-
-  /**
-   * Synchronously deletes each directory one at a time.
-   * Should be executed in its own thread, as this may take a long time.
-   */
-  private void deleteExecutorDirs(String[] dirs) {
-    for (String localDir : dirs) {
-      try {
-        JavaUtils.deleteRecursively(new File(localDir));
-        logger.debug("Successfully cleaned up directory: " + localDir);
-      } catch (Exception e) {
-        logger.error("Failed to delete directory: " + localDir, e);
-      }
-    }
-  }
-
-  /**
-   * Hash-based shuffle data is simply stored as one file per block.
-   * This logic is from FileShuffleBlockManager.
-   */
-  // TODO: Support consolidated hash shuffle files
-  private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
-    File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
-    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
-  }
-
-  /**
-   * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
-   * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
-   * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
-   */
-  private ManagedBuffer getSortBasedShuffleBlockData(
-    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
-    File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
-      "shuffle_" + shuffleId + "_" + mapId + "_0.index");
-
-    DataInputStream in = null;
-    try {
-      in = new DataInputStream(new FileInputStream(indexFile));
-      in.skipBytes(reduceId * 8);
-      long offset = in.readLong();
-      long nextOffset = in.readLong();
-      return new FileSegmentManagedBuffer(
-        conf,
-        getFile(executor.localDirs, executor.subDirsPerLocalDir,
-          "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
-        offset,
-        nextOffset - offset);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to open file: " + indexFile, e);
-    } finally {
-      if (in != null) {
-        JavaUtils.closeQuietly(in);
-      }
-    }
-  }
-
-  /**
-   * Hashes a filename into the corresponding local directory, in a manner consistent with
-   * Spark's DiskBlockManager.getFile().
-   */
-  @VisibleForTesting
-  static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
-    int hash = JavaUtils.nonNegativeHash(filename);
-    String localDir = localDirs[hash % localDirs.length];
-    int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
-    return new File(new File(localDir, String.format("%02x", subDirId)), filename);
-  }
-
-  /** Simply encodes an executor's full ID, which is appId + execId. */
-  private static class AppExecId {
-    final String appId;
-    final String execId;
-
-    private AppExecId(String appId, String execId) {
-      this.appId = appId;
-      this.execId = execId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      AppExecId appExecId = (AppExecId) o;
-      return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(appId, execId);
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-        .add("appId", appId)
-        .add("execId", execId)
-        .toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
new file mode 100644
index 0000000..dd08e24
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
+ * of Executors. Each Executor must register its own configuration about where it stores its files
+ * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
+ * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
+ *
+ * Executors with shuffle file consolidation are not currently supported, as the index is stored in
+ * the Executor's memory, unlike the IndexShuffleBlockResolver.
+ */
+public class ExternalShuffleBlockResolver {
+  private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+
+  // Map containing all registered executors' metadata.
+  private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+
+  // Single-threaded Java executor used to perform expensive recursive directory deletion.
+  private final Executor directoryCleaner;
+
+  private final TransportConf conf;
+
+  public ExternalShuffleBlockResolver(TransportConf conf) {
+    this(conf, Executors.newSingleThreadExecutor(
+        // Add `spark` prefix because it will run in NM in Yarn mode.
+        NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
+  }
+
+  // Allows tests to have more control over when directories are cleaned up.
+  @VisibleForTesting
+  ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
+    this.conf = conf;
+    this.executors = Maps.newConcurrentMap();
+    this.directoryCleaner = directoryCleaner;
+  }
+
+  /** Registers a new Executor with all the configuration we need to find its shuffle files. */
+  public void registerExecutor(
+      String appId,
+      String execId,
+      ExecutorShuffleInfo executorInfo) {
+    AppExecId fullId = new AppExecId(appId, execId);
+    logger.info("Registered executor {} with {}", fullId, executorInfo);
+    executors.put(fullId, executorInfo);
+  }
+
+  /**
+   * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
+   * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
+   * assumptions about how the hash and sort based shuffles store their data.
+   */
+  public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
+    String[] blockIdParts = blockId.split("_");
+    if (blockIdParts.length < 4) {
+      throw new IllegalArgumentException("Unexpected block id format: " + blockId);
+    } else if (!blockIdParts[0].equals("shuffle")) {
+      throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
+    }
+    int shuffleId = Integer.parseInt(blockIdParts[1]);
+    int mapId = Integer.parseInt(blockIdParts[2]);
+    int reduceId = Integer.parseInt(blockIdParts[3]);
+
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+    if (executor == null) {
+      throw new RuntimeException(
+        String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
+    }
+
+    if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
+      return getHashBasedShuffleBlockData(executor, blockId);
+    } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) {
+      return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
+    } else {
+      throw new UnsupportedOperationException(
+        "Unsupported shuffle manager: " + executor.shuffleManager);
+    }
+  }
+
+  /**
+   * Removes our metadata of all executors registered for the given application, and optionally
+   * also deletes the local directories associated with the executors of that application in a
+   * separate thread.
+   *
+   * It is not valid to call registerExecutor() for an executor with this appId after invoking
+   * this method.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+    Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+      AppExecId fullId = entry.getKey();
+      final ExecutorShuffleInfo executor = entry.getValue();
+
+      // Only touch executors associated with the appId that was removed.
+      if (appId.equals(fullId.appId)) {
+        it.remove();
+
+        if (cleanupLocalDirs) {
+          logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+          // Execute the actual deletion in a different thread, as it may take some time.
+          directoryCleaner.execute(new Runnable() {
+            @Override
+            public void run() {
+              deleteExecutorDirs(executor.localDirs);
+            }
+          });
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously deletes each directory one at a time.
+   * Should be executed in its own thread, as this may take a long time.
+   */
+  private void deleteExecutorDirs(String[] dirs) {
+    for (String localDir : dirs) {
+      try {
+        JavaUtils.deleteRecursively(new File(localDir));
+        logger.debug("Successfully cleaned up directory: " + localDir);
+      } catch (Exception e) {
+        logger.error("Failed to delete directory: " + localDir, e);
+      }
+    }
+  }
+
+  /**
+   * Hash-based shuffle data is simply stored as one file per block.
+   * This logic is from FileShuffleBlockResolver.
+   */
+  // TODO: Support consolidated hash shuffle files
+  private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
+    File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
+    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
+  }
+
+  /**
+   * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
+   * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
+   * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
+   */
+  private ManagedBuffer getSortBasedShuffleBlockData(
+    ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
+    File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
+      "shuffle_" + shuffleId + "_" + mapId + "_0.index");
+
+    DataInputStream in = null;
+    try {
+      in = new DataInputStream(new FileInputStream(indexFile));
+      in.skipBytes(reduceId * 8);
+      long offset = in.readLong();
+      long nextOffset = in.readLong();
+      return new FileSegmentManagedBuffer(
+        conf,
+        getFile(executor.localDirs, executor.subDirsPerLocalDir,
+          "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
+        offset,
+        nextOffset - offset);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to open file: " + indexFile, e);
+    } finally {
+      if (in != null) {
+        JavaUtils.closeQuietly(in);
+      }
+    }
+  }
+
+  /**
+   * Hashes a filename into the corresponding local directory, in a manner consistent with
+   * Spark's DiskBlockManager.getFile().
+   */
+  @VisibleForTesting
+  static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
+    int hash = JavaUtils.nonNegativeHash(filename);
+    String localDir = localDirs[hash % localDirs.length];
+    int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
+    return new File(new File(localDir, String.format("%02x", subDirId)), filename);
+  }
+
+  /** Simply encodes an executor's full ID, which is appId + execId. */
+  private static class AppExecId {
+    final String appId;
+    final String execId;
+
+    private AppExecId(String appId, String execId) {
+      this.appId = appId;
+      this.execId = execId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppExecId appExecId = (AppExecId) o;
+      return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 3f9fe16..73374cd 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
   TransportClient client = mock(TransportClient.class);
 
   OneForOneStreamManager streamManager;
-  ExternalShuffleBlockManager blockManager;
+  ExternalShuffleBlockResolver blockResolver;
   RpcHandler handler;
 
   @Before
   public void beforeEach() {
     streamManager = mock(OneForOneStreamManager.class);
-    blockManager = mock(ExternalShuffleBlockManager.class);
-    handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
+    blockResolver = mock(ExternalShuffleBlockResolver.class);
+    handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
   }
 
   @Test
@@ -62,7 +62,7 @@ public class ExternalShuffleBlockHandlerSuite {
     ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
     byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
     handler.receive(client, registerMessage, callback);
-    verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
+    verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
 
     verify(callback, times(1)).onSuccess((byte[]) any());
     verify(callback, never()).onFailure((Throwable) any());
@@ -75,12 +75,12 @@ public class ExternalShuffleBlockHandlerSuite {
 
     ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
     ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
-    when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
-    when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+    when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+    when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
     byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
     handler.receive(client, openBlocks, callback);
-    verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
-    verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
 
     ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
     verify(callback, times(1)).onSuccess(response.capture());

http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
deleted file mode 100644
index dad6428..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import com.google.common.io.CharStreams;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ExternalShuffleBlockManagerSuite {
-  static String sortBlock0 = "Hello!";
-  static String sortBlock1 = "World!";
-
-  static String hashBlock0 = "Elementary";
-  static String hashBlock1 = "Tabular";
-
-  static TestShuffleDataContext dataContext;
-
-  static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
-
-  @BeforeClass
-  public static void beforeAll() throws IOException {
-    dataContext = new TestShuffleDataContext(2, 5);
-
-    dataContext.create();
-    // Write some sort and hash data.
-    dataContext.insertSortShuffleData(0, 0,
-      new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
-    dataContext.insertHashShuffleData(1, 0,
-      new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
-  }
-
-  @AfterClass
-  public static void afterAll() {
-    dataContext.cleanup();
-  }
-
-  @Test
-  public void testBadRequests() {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
-    // Unregistered executor
-    try {
-      manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (RuntimeException e) {
-      assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
-    }
-
-    // Invalid shuffle manager
-    manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
-    try {
-      manager.getBlockData("app0", "exec2", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (UnsupportedOperationException e) {
-      // pass
-    }
-
-    // Nonexistent shuffle block
-    manager.registerExecutor("app0", "exec3",
-      dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
-    try {
-      manager.getBlockData("app0", "exec3", "shuffle_1_1_0");
-      fail("Should have failed");
-    } catch (Exception e) {
-      // pass
-    }
-  }
-
-  @Test
-  public void testSortShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
-    manager.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
-
-    InputStream block0Stream =
-      manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
-    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
-    block0Stream.close();
-    assertEquals(sortBlock0, block0);
-
-    InputStream block1Stream =
-      manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
-    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
-    block1Stream.close();
-    assertEquals(sortBlock1, block1);
-  }
-
-  @Test
-  public void testHashShuffleBlocks() throws IOException {
-    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
-    manager.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
-
-    InputStream block0Stream =
-      manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
-    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
-    block0Stream.close();
-    assertEquals(hashBlock0, block0);
-
-    InputStream block1Stream =
-      manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
-    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
-    block1Stream.close();
-    assertEquals(hashBlock1, block1);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org