You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/08 21:30:40 UTC
[2/2] spark git commit: [SPARK-6627] Finished rename to
ShuffleBlockResolver
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager
to ShuffleBlockResolver, but didn't rename the associated subclasses and
variables; this commit does that.
I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class?
cc pwendell
Author: Kay Ousterhout <ka...@gmail.com>
Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits:
43add1e [Kay Ousterhout] Spacing fix
96080bf [Kay Ousterhout] Test fixes
d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b3bb0e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b3bb0e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b3bb0e4
Branch: refs/heads/master
Commit: 4b3bb0e43ca7e1a27308516608419487b6a844e6
Parents: 2d05f32
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Fri May 8 12:24:06 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Fri May 8 12:24:06 2015 -0700
----------------------------------------------------------------------
.../spark/shuffle/FileShuffleBlockManager.scala | 300 -------------------
.../shuffle/FileShuffleBlockResolver.scala | 297 ++++++++++++++++++
.../shuffle/IndexShuffleBlockManager.scala | 124 --------
.../shuffle/IndexShuffleBlockResolver.scala | 122 ++++++++
.../spark/shuffle/hash/HashShuffleManager.scala | 6 +-
.../spark/shuffle/hash/HashShuffleWriter.scala | 4 +-
.../spark/shuffle/sort/SortShuffleManager.scala | 6 +-
.../spark/shuffle/sort/SortShuffleWriter.scala | 12 +-
.../org/apache/spark/storage/BlockId.scala | 2 +-
.../org/apache/spark/storage/BlockManager.scala | 5 +-
.../apache/spark/storage/DiskBlockManager.scala | 2 +-
.../shuffle/hash/HashShuffleManagerSuite.scala | 18 +-
.../shuffle/ExternalShuffleBlockHandler.java | 6 +-
.../shuffle/ExternalShuffleBlockManager.java | 254 ----------------
.../shuffle/ExternalShuffleBlockResolver.java | 254 ++++++++++++++++
.../ExternalShuffleBlockHandlerSuite.java | 16 +-
.../ExternalShuffleBlockManagerSuite.java | 129 --------
.../ExternalShuffleBlockResolverSuite.java | 129 ++++++++
.../shuffle/ExternalShuffleCleanupSuite.java | 37 +--
.../network/shuffle/TestShuffleDataContext.java | 8 +-
20 files changed, 865 insertions(+), 866 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
deleted file mode 100644
index e9b4e2b..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.io.File
-import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConversions._
-
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
-import org.apache.spark.storage._
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
-
-/** A group of writers for a ShuffleMapTask, one writer per reducer. */
-private[spark] trait ShuffleWriterGroup {
- val writers: Array[BlockObjectWriter]
-
- /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
- def releaseWriters(success: Boolean)
-}
-
-/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer (this set of files is called a ShuffleFileGroup).
- *
- * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
- * files, it releases them for another task.
- * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
- * - shuffleId: The unique id given to the entire shuffle stage.
- * - bucketId: The id of the output partition (i.e., reducer id)
- * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
- * time owns a particular fileId, and this id is returned to a pool when the task finishes.
- * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
- * that specifies where in a given file the actual block data is located.
- *
- * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
- * each block stored in each file. In order to find the location of a shuffle block, we search the
- * files within a ShuffleFileGroups associated with the block's reducer.
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
-private[spark]
-class FileShuffleBlockManager(conf: SparkConf)
- extends ShuffleBlockResolver with Logging {
-
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
-
- private lazy val blockManager = SparkEnv.get.blockManager
-
- // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
- // TODO: Remove this once the shuffle file consolidation feature is stable.
- private val consolidateShuffleFiles =
- conf.getBoolean("spark.shuffle.consolidateFiles", false)
-
- // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
- private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
-
- /**
- * Contains all the state related to a particular shuffle. This includes a pool of unused
- * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
- */
- private class ShuffleState(val numBuckets: Int) {
- val nextFileId = new AtomicInteger(0)
- val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
- val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
- /**
- * The mapIds of all map tasks completed on this Executor for this shuffle.
- * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
- */
- val completedMapTasks = new ConcurrentLinkedQueue[Int]()
- }
-
- private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
-
- private val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
-
- /**
- * Get a ShuffleWriterGroup for the given map task, which will register it as complete
- * when the writers are closed successfully
- */
- def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
- writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
- new ShuffleWriterGroup {
- shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
- private val shuffleState = shuffleStates(shuffleId)
- private var fileGroup: ShuffleFileGroup = null
-
- val openStartTime = System.nanoTime
- val serializerInstance = serializer.newInstance()
- val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
- fileGroup = getUnusedFileGroup()
- Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
- val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
- writeMetrics)
- }
- } else {
- Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
- val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
- val blockFile = blockManager.diskBlockManager.getFile(blockId)
- // Because of previous failures, the shuffle file may already exist on this machine.
- // If so, remove it.
- if (blockFile.exists) {
- if (blockFile.delete()) {
- logInfo(s"Removed existing shuffle file $blockFile")
- } else {
- logWarning(s"Failed to remove existing shuffle file $blockFile")
- }
- }
- blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
- writeMetrics)
- }
- }
- // Creating the file to write to and creating a disk writer both involve interacting with
- // the disk, so should be included in the shuffle write time.
- writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
-
- override def releaseWriters(success: Boolean) {
- if (consolidateShuffleFiles) {
- if (success) {
- val offsets = writers.map(_.fileSegment().offset)
- val lengths = writers.map(_.fileSegment().length)
- fileGroup.recordMapOutput(mapId, offsets, lengths)
- }
- recycleFileGroup(fileGroup)
- } else {
- shuffleState.completedMapTasks.add(mapId)
- }
- }
-
- private def getUnusedFileGroup(): ShuffleFileGroup = {
- val fileGroup = shuffleState.unusedFileGroups.poll()
- if (fileGroup != null) fileGroup else newFileGroup()
- }
-
- private def newFileGroup(): ShuffleFileGroup = {
- val fileId = shuffleState.nextFileId.getAndIncrement()
- val files = Array.tabulate[File](numBuckets) { bucketId =>
- val filename = physicalFileName(shuffleId, bucketId, fileId)
- blockManager.diskBlockManager.getFile(filename)
- }
- val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
- shuffleState.allFileGroups.add(fileGroup)
- fileGroup
- }
-
- private def recycleFileGroup(group: ShuffleFileGroup) {
- shuffleState.unusedFileGroups.add(group)
- }
- }
- }
-
- override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- if (consolidateShuffleFiles) {
- // Search all file groups associated with this shuffle.
- val shuffleState = shuffleStates(blockId.shuffleId)
- val iter = shuffleState.allFileGroups.iterator
- while (iter.hasNext) {
- val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
- if (segmentOpt.isDefined) {
- val segment = segmentOpt.get
- return new FileSegmentManagedBuffer(
- transportConf, segment.file, segment.offset, segment.length)
- }
- }
- throw new IllegalStateException("Failed to find shuffle block: " + blockId)
- } else {
- val file = blockManager.diskBlockManager.getFile(blockId)
- new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
- }
- }
-
- /** Remove all the blocks / files and metadata related to a particular shuffle. */
- def removeShuffle(shuffleId: ShuffleId): Boolean = {
- // Do not change the ordering of this, if shuffleStates should be removed only
- // after the corresponding shuffle blocks have been removed
- val cleaned = removeShuffleBlocks(shuffleId)
- shuffleStates.remove(shuffleId)
- cleaned
- }
-
- /** Remove all the blocks / files related to a particular shuffle. */
- private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
- shuffleStates.get(shuffleId) match {
- case Some(state) =>
- if (consolidateShuffleFiles) {
- for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
- file.delete()
- }
- } else {
- for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
- val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
- blockManager.diskBlockManager.getFile(blockId).delete()
- }
- }
- logInfo("Deleted all files for shuffle " + shuffleId)
- true
- case None =>
- logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
- false
- }
- }
-
- private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
- "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
- }
-
- private def cleanup(cleanupTime: Long) {
- shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
- }
-
- override def stop() {
- metadataCleaner.cancel()
- }
-}
-
-private[spark]
-object FileShuffleBlockManager {
- /**
- * A group of shuffle files, one per reducer.
- * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
- */
- private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
- private var numBlocks: Int = 0
-
- /**
- * Stores the absolute index of each mapId in the files of this group. For instance,
- * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
- */
- private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
- /**
- * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
- * position in the file.
- * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
- * reducer.
- */
- private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
- new PrimitiveVector[Long]()
- }
- private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
- new PrimitiveVector[Long]()
- }
-
- def apply(bucketId: Int): File = files(bucketId)
-
- def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
- assert(offsets.length == lengths.length)
- mapIdToIndex(mapId) = numBlocks
- numBlocks += 1
- for (i <- 0 until offsets.length) {
- blockOffsetsByReducer(i) += offsets(i)
- blockLengthsByReducer(i) += lengths(i)
- }
- }
-
- /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
- def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
- val file = files(reducerId)
- val blockOffsets = blockOffsetsByReducer(reducerId)
- val blockLengths = blockLengthsByReducer(reducerId)
- val index = mapIdToIndex.getOrElse(mapId, -1)
- if (index >= 0) {
- val offset = blockOffsets(index)
- val length = blockLengths(index)
- Some(new FileSegment(file, offset, length))
- } else {
- None
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
new file mode 100644
index 0000000..6ad427b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
+import org.apache.spark.storage._
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+
+/** A group of writers for a ShuffleMapTask, one writer per reducer. */
+private[spark] trait ShuffleWriterGroup {
+ val writers: Array[BlockObjectWriter]
+
+ /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
+ def releaseWriters(success: Boolean)
+}
+
+/**
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
+ * per reducer (this set of files is called a ShuffleFileGroup).
+ *
+ * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
+ * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
+ * files, it releases them for another task.
+ * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
+ * - shuffleId: The unique id given to the entire shuffle stage.
+ * - bucketId: The id of the output partition (i.e., reducer id)
+ * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
+ * time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
+ * that specifies where in a given file the actual block data is located.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
+ * each block stored in each file. In order to find the location of a shuffle block, we search the
+ * files within a ShuffleFileGroups associated with the block's reducer.
+ */
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
+private[spark] class FileShuffleBlockResolver(conf: SparkConf)
+ extends ShuffleBlockResolver with Logging {
+
+ private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
+ private lazy val blockManager = SparkEnv.get.blockManager
+
+ // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
+ // TODO: Remove this once the shuffle file consolidation feature is stable.
+ private val consolidateShuffleFiles =
+ conf.getBoolean("spark.shuffle.consolidateFiles", false)
+
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
+ private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
+
+ /**
+ * Contains all the state related to a particular shuffle. This includes a pool of unused
+ * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
+ */
+ private class ShuffleState(val numBuckets: Int) {
+ val nextFileId = new AtomicInteger(0)
+ val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+ val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+ /**
+ * The mapIds of all map tasks completed on this Executor for this shuffle.
+ * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
+ */
+ val completedMapTasks = new ConcurrentLinkedQueue[Int]()
+ }
+
+ private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
+
+ private val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
+
+ /**
+ * Get a ShuffleWriterGroup for the given map task, which will register it as complete
+ * when the writers are closed successfully
+ */
+ def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
+ writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
+ new ShuffleWriterGroup {
+ shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
+ private val shuffleState = shuffleStates(shuffleId)
+ private var fileGroup: ShuffleFileGroup = null
+
+ val openStartTime = System.nanoTime
+ val serializerInstance = serializer.newInstance()
+ val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
+ fileGroup = getUnusedFileGroup()
+ Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+ val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+ blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
+ writeMetrics)
+ }
+ } else {
+ Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+ val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+ val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ // Because of previous failures, the shuffle file may already exist on this machine.
+ // If so, remove it.
+ if (blockFile.exists) {
+ if (blockFile.delete()) {
+ logInfo(s"Removed existing shuffle file $blockFile")
+ } else {
+ logWarning(s"Failed to remove existing shuffle file $blockFile")
+ }
+ }
+ blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
+ writeMetrics)
+ }
+ }
+ // Creating the file to write to and creating a disk writer both involve interacting with
+ // the disk, so should be included in the shuffle write time.
+ writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
+
+ override def releaseWriters(success: Boolean) {
+ if (consolidateShuffleFiles) {
+ if (success) {
+ val offsets = writers.map(_.fileSegment().offset)
+ val lengths = writers.map(_.fileSegment().length)
+ fileGroup.recordMapOutput(mapId, offsets, lengths)
+ }
+ recycleFileGroup(fileGroup)
+ } else {
+ shuffleState.completedMapTasks.add(mapId)
+ }
+ }
+
+ private def getUnusedFileGroup(): ShuffleFileGroup = {
+ val fileGroup = shuffleState.unusedFileGroups.poll()
+ if (fileGroup != null) fileGroup else newFileGroup()
+ }
+
+ private def newFileGroup(): ShuffleFileGroup = {
+ val fileId = shuffleState.nextFileId.getAndIncrement()
+ val files = Array.tabulate[File](numBuckets) { bucketId =>
+ val filename = physicalFileName(shuffleId, bucketId, fileId)
+ blockManager.diskBlockManager.getFile(filename)
+ }
+ val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
+ shuffleState.allFileGroups.add(fileGroup)
+ fileGroup
+ }
+
+ private def recycleFileGroup(group: ShuffleFileGroup) {
+ shuffleState.unusedFileGroups.add(group)
+ }
+ }
+ }
+
+ override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
+ if (consolidateShuffleFiles) {
+ // Search all file groups associated with this shuffle.
+ val shuffleState = shuffleStates(blockId.shuffleId)
+ val iter = shuffleState.allFileGroups.iterator
+ while (iter.hasNext) {
+ val segmentOpt = iter.next.getFileSegmentFor(blockId.mapId, blockId.reduceId)
+ if (segmentOpt.isDefined) {
+ val segment = segmentOpt.get
+ return new FileSegmentManagedBuffer(
+ transportConf, segment.file, segment.offset, segment.length)
+ }
+ }
+ throw new IllegalStateException("Failed to find shuffle block: " + blockId)
+ } else {
+ val file = blockManager.diskBlockManager.getFile(blockId)
+ new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
+ }
+ }
+
+ /** Remove all the blocks / files and metadata related to a particular shuffle. */
+ def removeShuffle(shuffleId: ShuffleId): Boolean = {
+ // Do not change the ordering of this, if shuffleStates should be removed only
+ // after the corresponding shuffle blocks have been removed
+ val cleaned = removeShuffleBlocks(shuffleId)
+ shuffleStates.remove(shuffleId)
+ cleaned
+ }
+
+ /** Remove all the blocks / files related to a particular shuffle. */
+ private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
+ shuffleStates.get(shuffleId) match {
+ case Some(state) =>
+ if (consolidateShuffleFiles) {
+ for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+ file.delete()
+ }
+ } else {
+ for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
+ val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+ blockManager.diskBlockManager.getFile(blockId).delete()
+ }
+ }
+ logInfo("Deleted all files for shuffle " + shuffleId)
+ true
+ case None =>
+ logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
+ false
+ }
+ }
+
+ private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
+ "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
+ }
+
+ private def cleanup(cleanupTime: Long) {
+ shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
+ }
+
+ override def stop() {
+ metadataCleaner.cancel()
+ }
+}
+
+private[spark] object FileShuffleBlockResolver {
+ /**
+ * A group of shuffle files, one per reducer.
+ * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
+ */
+ private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+ private var numBlocks: Int = 0
+
+ /**
+ * Stores the absolute index of each mapId in the files of this group. For instance,
+ * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
+ */
+ private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
+
+ /**
+ * Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
+ * position in the file.
+ * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
+ * reducer.
+ */
+ private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+ new PrimitiveVector[Long]()
+ }
+ private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
+ new PrimitiveVector[Long]()
+ }
+
+ def apply(bucketId: Int): File = files(bucketId)
+
+ def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
+ assert(offsets.length == lengths.length)
+ mapIdToIndex(mapId) = numBlocks
+ numBlocks += 1
+ for (i <- 0 until offsets.length) {
+ blockOffsetsByReducer(i) += offsets(i)
+ blockLengthsByReducer(i) += lengths(i)
+ }
+ }
+
+ /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
+ def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
+ val file = files(reducerId)
+ val blockOffsets = blockOffsetsByReducer(reducerId)
+ val blockLengths = blockLengthsByReducer(reducerId)
+ val index = mapIdToIndex.getOrElse(mapId, -1)
+ if (index >= 0) {
+ val offset = blockOffsets(index)
+ val length = blockLengths(index)
+ Some(new FileSegment(file, offset, length))
+ } else {
+ None
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
deleted file mode 100644
index a1741e2..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.io._
-import java.nio.ByteBuffer
-
-import com.google.common.io.ByteStreams
-
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.storage._
-import org.apache.spark.util.Utils
-
-import IndexShuffleBlockManager.NOOP_REDUCE_ID
-
-/**
- * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
- * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
- * The offsets of the data blocks in the data file are stored in a separate index file.
- *
- * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
- * as the filename postfix for data file, and ".index" as the filename postfix for index file.
- *
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
-private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
-
- private lazy val blockManager = SparkEnv.get.blockManager
-
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
-
- def getDataFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- }
-
- private def getIndexFile(shuffleId: Int, mapId: Int): File = {
- blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- }
-
- /**
- * Remove data file and index file that contain the output data from one map.
- * */
- def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
- var file = getDataFile(shuffleId, mapId)
- if (file.exists()) {
- file.delete()
- }
-
- file = getIndexFile(shuffleId, mapId)
- if (file.exists()) {
- file.delete()
- }
- }
-
- /**
- * Write an index file with the offsets of each block, plus a final offset at the end for the
- * end of the output file. This will be used by getBlockLocation to figure out where each block
- * begins and ends.
- * */
- def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
- val indexFile = getIndexFile(shuffleId, mapId)
- val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
- Utils.tryWithSafeFinally {
- // We take in lengths of each block, need to convert it to offsets.
- var offset = 0L
- out.writeLong(offset)
- for (length <- lengths) {
- offset += length
- out.writeLong(offset)
- }
- } {
- out.close()
- }
- }
-
- override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
- // The block is actually going to be a range of a single map output file for this map, so
- // find out the consolidated file, then the offset within that from our index
- val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
-
- val in = new DataInputStream(new FileInputStream(indexFile))
- try {
- ByteStreams.skipFully(in, blockId.reduceId * 8)
- val offset = in.readLong()
- val nextOffset = in.readLong()
- new FileSegmentManagedBuffer(
- transportConf,
- getDataFile(blockId.shuffleId, blockId.mapId),
- offset,
- nextOffset - offset)
- } finally {
- in.close()
- }
- }
-
- override def stop(): Unit = {}
-}
-
-private[spark] object IndexShuffleBlockManager {
- // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
- // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
- // shuffle outputs for several reduces are glommed into a single file.
- // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
- val NOOP_REDUCE_ID = 0
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
new file mode 100644
index 0000000..d9c63b6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io._
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+import IndexShuffleBlockResolver.NOOP_REDUCE_ID
+
+/**
+ * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
+ * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
+ * The offsets of the data blocks in the data file are stored in a separate index file.
+ *
+ * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
+ * as the filename postfix for data file, and ".index" as the filename postfix for index file.
+ *
+ */
+// Note: Changes to the format in this file should be kept in sync with
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
+
+ private lazy val blockManager = SparkEnv.get.blockManager
+
+ private val transportConf = SparkTransportConf.fromSparkConf(conf)
+
+ def getDataFile(shuffleId: Int, mapId: Int): File = {
+ blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
+ }
+
+ private def getIndexFile(shuffleId: Int, mapId: Int): File = {
+ blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
+ }
+
+ /**
+ * Remove data file and index file that contain the output data from one map.
+ * */
+ def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
+ var file = getDataFile(shuffleId, mapId)
+ if (file.exists()) {
+ file.delete()
+ }
+
+ file = getIndexFile(shuffleId, mapId)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+
+ /**
+ * Write an index file with the offsets of each block, plus a final offset at the end for the
+ * end of the output file. This will be used by getBlockLocation to figure out where each block
+ * begins and ends.
+ * */
+ def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
+ val indexFile = getIndexFile(shuffleId, mapId)
+ val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
+ Utils.tryWithSafeFinally {
+ // We take in lengths of each block, need to convert it to offsets.
+ var offset = 0L
+ out.writeLong(offset)
+ for (length <- lengths) {
+ offset += length
+ out.writeLong(offset)
+ }
+ } {
+ out.close()
+ }
+ }
+
+ override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
+ // The block is actually going to be a range of a single map output file for this map, so
+ // find out the consolidated file, then the offset within that from our index
+ val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
+
+ val in = new DataInputStream(new FileInputStream(indexFile))
+ try {
+ ByteStreams.skipFully(in, blockId.reduceId * 8)
+ val offset = in.readLong()
+ val nextOffset = in.readLong()
+ new FileSegmentManagedBuffer(
+ transportConf,
+ getDataFile(blockId.shuffleId, blockId.mapId),
+ offset,
+ nextOffset - offset)
+ } finally {
+ in.close()
+ }
+ }
+
+ override def stop(): Unit = {}
+}
+
+private[spark] object IndexShuffleBlockResolver {
+ // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
+ // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
+ // shuffle outputs for several reduces are glommed into a single file.
+ // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
+ val NOOP_REDUCE_ID = 0
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 2a7df8d..c089088 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
+ private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
@@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleBlockResolver.removeShuffle(shuffleId)
}
- override def shuffleBlockResolver: FileShuffleBlockManager = {
- fileShuffleBlockManager
+ override def shuffleBlockResolver: FileShuffleBlockResolver = {
+ fileShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index cd27c9e..897f0a5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter
private[spark] class HashShuffleWriter[K, V](
- shuffleBlockManager: FileShuffleBlockManager,
+ shuffleBlockResolver: FileShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
@@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](
private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
- private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
+ private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)
/** Write a bunch of records to this task's output */
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 0497036..1584294 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {
- private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
+ private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
/**
@@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
true
}
- override def shuffleBlockResolver: IndexShuffleBlockManager = {
- indexShuffleBlockManager
+ override def shuffleBlockResolver: IndexShuffleBlockResolver = {
+ indexShuffleBlockResolver
}
/** Shut down this ShuffleManager. */
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index a066435..add2656 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
private[spark] class SortShuffleWriter[K, V, C](
- shuffleBlockManager: IndexShuffleBlockManager,
+ shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
@@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
- val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
+ val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
+ val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
- shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
+ shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
@@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
- shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
+ shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index c186fd3..524f697 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}
// Format of the shuffle block ids (including data and index) should be kept in sync with
-// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
+// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a46fecd..cc794e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -431,10 +431,11 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
- val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+ val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
- Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+ Option(
+ shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 5764c16..2a44477 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
- // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
+ // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 84384bb..0537bf6 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.shuffle.FileShuffleBlockManager
+import org.apache.spark.shuffle.FileShuffleBlockResolver
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}
class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
@@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)
- val shuffleBlockManager =
- SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
+ val shuffleBlockResolver =
+ SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]
- val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
+ val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle1.writers) {
writer.write("test1", "value")
@@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)
- val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
+ val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle2.writers) {
@@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// of block based on remaining data in file : which could mess things up when there is
// concurrent read and writes happening to the same shuffle group.
- val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
+ val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
for (writer <- shuffle3.writers) {
writer.write("test3", "value")
@@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
writer.commitAndClose()
}
// check before we register.
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
- checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
- shuffleBlockManager.removeShuffle(1)
+ checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
+ shuffleBlockResolver.removeShuffle(1)
}
def writeToFile(file: File, numBytes: Int) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 46ca970..e4faaf8 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -46,18 +46,18 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
- private final ExternalShuffleBlockManager blockManager;
+ private final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
public ExternalShuffleBlockHandler(TransportConf conf) {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
+ this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
}
/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
- ExternalShuffleBlockManager blockManager) {
+ ExternalShuffleBlockResolver blockManager) {
this.streamManager = streamManager;
this.blockManager = blockManager;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
deleted file mode 100644
index 93e6fdd..0000000
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
-import org.apache.spark.network.util.JavaUtils;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
- * of Executors. Each Executor must register its own configuration about where it stores its files
- * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
- *
- * Executors with shuffle file consolidation are not currently supported, as the index is stored in
- * the Executor's memory, unlike the IndexShuffleBlockManager.
- */
-public class ExternalShuffleBlockManager {
- private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
-
- // Map containing all registered executors' metadata.
- private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
-
- // Single-threaded Java executor used to perform expensive recursive directory deletion.
- private final Executor directoryCleaner;
-
- private final TransportConf conf;
-
- public ExternalShuffleBlockManager(TransportConf conf) {
- this(conf, Executors.newSingleThreadExecutor(
- // Add `spark` prefix because it will run in NM in Yarn mode.
- NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
- }
-
- // Allows tests to have more control over when directories are cleaned up.
- @VisibleForTesting
- ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
- this.conf = conf;
- this.executors = Maps.newConcurrentMap();
- this.directoryCleaner = directoryCleaner;
- }
-
- /** Registers a new Executor with all the configuration we need to find its shuffle files. */
- public void registerExecutor(
- String appId,
- String execId,
- ExecutorShuffleInfo executorInfo) {
- AppExecId fullId = new AppExecId(appId, execId);
- logger.info("Registered executor {} with {}", fullId, executorInfo);
- executors.put(fullId, executorInfo);
- }
-
- /**
- * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
- * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
- * assumptions about how the hash and sort based shuffles store their data.
- */
- public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
- String[] blockIdParts = blockId.split("_");
- if (blockIdParts.length < 4) {
- throw new IllegalArgumentException("Unexpected block id format: " + blockId);
- } else if (!blockIdParts[0].equals("shuffle")) {
- throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
- }
- int shuffleId = Integer.parseInt(blockIdParts[1]);
- int mapId = Integer.parseInt(blockIdParts[2]);
- int reduceId = Integer.parseInt(blockIdParts[3]);
-
- ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
- if (executor == null) {
- throw new RuntimeException(
- String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
- }
-
- if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
- return getHashBasedShuffleBlockData(executor, blockId);
- } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) {
- return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
- } else {
- throw new UnsupportedOperationException(
- "Unsupported shuffle manager: " + executor.shuffleManager);
- }
- }
-
- /**
- * Removes our metadata of all executors registered for the given application, and optionally
- * also deletes the local directories associated with the executors of that application in a
- * separate thread.
- *
- * It is not valid to call registerExecutor() for an executor with this appId after invoking
- * this method.
- */
- public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
- logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
- Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
- AppExecId fullId = entry.getKey();
- final ExecutorShuffleInfo executor = entry.getValue();
-
- // Only touch executors associated with the appId that was removed.
- if (appId.equals(fullId.appId)) {
- it.remove();
-
- if (cleanupLocalDirs) {
- logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
-
- // Execute the actual deletion in a different thread, as it may take some time.
- directoryCleaner.execute(new Runnable() {
- @Override
- public void run() {
- deleteExecutorDirs(executor.localDirs);
- }
- });
- }
- }
- }
- }
-
- /**
- * Synchronously deletes each directory one at a time.
- * Should be executed in its own thread, as this may take a long time.
- */
- private void deleteExecutorDirs(String[] dirs) {
- for (String localDir : dirs) {
- try {
- JavaUtils.deleteRecursively(new File(localDir));
- logger.debug("Successfully cleaned up directory: " + localDir);
- } catch (Exception e) {
- logger.error("Failed to delete directory: " + localDir, e);
- }
- }
- }
-
- /**
- * Hash-based shuffle data is simply stored as one file per block.
- * This logic is from FileShuffleBlockManager.
- */
- // TODO: Support consolidated hash shuffle files
- private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
- File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
- return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
- }
-
- /**
- * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
- * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
- * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
- */
- private ManagedBuffer getSortBasedShuffleBlockData(
- ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
- File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
- "shuffle_" + shuffleId + "_" + mapId + "_0.index");
-
- DataInputStream in = null;
- try {
- in = new DataInputStream(new FileInputStream(indexFile));
- in.skipBytes(reduceId * 8);
- long offset = in.readLong();
- long nextOffset = in.readLong();
- return new FileSegmentManagedBuffer(
- conf,
- getFile(executor.localDirs, executor.subDirsPerLocalDir,
- "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
- offset,
- nextOffset - offset);
- } catch (IOException e) {
- throw new RuntimeException("Failed to open file: " + indexFile, e);
- } finally {
- if (in != null) {
- JavaUtils.closeQuietly(in);
- }
- }
- }
-
- /**
- * Hashes a filename into the corresponding local directory, in a manner consistent with
- * Spark's DiskBlockManager.getFile().
- */
- @VisibleForTesting
- static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
- int hash = JavaUtils.nonNegativeHash(filename);
- String localDir = localDirs[hash % localDirs.length];
- int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
- return new File(new File(localDir, String.format("%02x", subDirId)), filename);
- }
-
- /** Simply encodes an executor's full ID, which is appId + execId. */
- private static class AppExecId {
- final String appId;
- final String execId;
-
- private AppExecId(String appId, String execId) {
- this.appId = appId;
- this.execId = execId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- AppExecId appExecId = (AppExecId) o;
- return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(appId, execId);
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this)
- .add("appId", appId)
- .add("execId", execId)
- .toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
new file mode 100644
index 0000000..dd08e24
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.NettyUtils;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
+ * of Executors. Each Executor must register its own configuration about where it stores its files
+ * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
+ * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
+ *
+ * Executors with shuffle file consolidation are not currently supported, as the index is stored in
+ * the Executor's memory, unlike the IndexShuffleBlockResolver.
+ */
+public class ExternalShuffleBlockResolver {
+ private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
+
+ // Map containing all registered executors' metadata.
+ private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
+
+ // Single-threaded Java executor used to perform expensive recursive directory deletion.
+ private final Executor directoryCleaner;
+
+ private final TransportConf conf;
+
+ public ExternalShuffleBlockResolver(TransportConf conf) {
+ this(conf, Executors.newSingleThreadExecutor(
+ // Add `spark` prefix because it will run in NM in Yarn mode.
+ NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
+ }
+
+ // Allows tests to have more control over when directories are cleaned up.
+ @VisibleForTesting
+ ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
+ this.conf = conf;
+ this.executors = Maps.newConcurrentMap();
+ this.directoryCleaner = directoryCleaner;
+ }
+
+ /** Registers a new Executor with all the configuration we need to find its shuffle files. */
+ public void registerExecutor(
+ String appId,
+ String execId,
+ ExecutorShuffleInfo executorInfo) {
+ AppExecId fullId = new AppExecId(appId, execId);
+ logger.info("Registered executor {} with {}", fullId, executorInfo);
+ executors.put(fullId, executorInfo);
+ }
+
+ /**
+ * Obtains a FileSegmentManagedBuffer from a shuffle block id. We expect the blockId has the
+ * format "shuffle_ShuffleId_MapId_ReduceId" (from ShuffleBlockId), and additionally make
+ * assumptions about how the hash and sort based shuffles store their data.
+ */
+ public ManagedBuffer getBlockData(String appId, String execId, String blockId) {
+ String[] blockIdParts = blockId.split("_");
+ if (blockIdParts.length < 4) {
+ throw new IllegalArgumentException("Unexpected block id format: " + blockId);
+ } else if (!blockIdParts[0].equals("shuffle")) {
+ throw new IllegalArgumentException("Expected shuffle block id, got: " + blockId);
+ }
+ int shuffleId = Integer.parseInt(blockIdParts[1]);
+ int mapId = Integer.parseInt(blockIdParts[2]);
+ int reduceId = Integer.parseInt(blockIdParts[3]);
+
+ ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
+ if (executor == null) {
+ throw new RuntimeException(
+ String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
+ }
+
+ if ("org.apache.spark.shuffle.hash.HashShuffleManager".equals(executor.shuffleManager)) {
+ return getHashBasedShuffleBlockData(executor, blockId);
+ } else if ("org.apache.spark.shuffle.sort.SortShuffleManager".equals(executor.shuffleManager)) {
+ return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported shuffle manager: " + executor.shuffleManager);
+ }
+ }
+
+ /**
+ * Removes our metadata of all executors registered for the given application, and optionally
+ * also deletes the local directories associated with the executors of that application in a
+ * separate thread.
+ *
+ * It is not valid to call registerExecutor() for an executor with this appId after invoking
+ * this method.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+ Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+ AppExecId fullId = entry.getKey();
+ final ExecutorShuffleInfo executor = entry.getValue();
+
+ // Only touch executors associated with the appId that was removed.
+ if (appId.equals(fullId.appId)) {
+ it.remove();
+
+ if (cleanupLocalDirs) {
+ logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+ // Execute the actual deletion in a different thread, as it may take some time.
+ directoryCleaner.execute(new Runnable() {
+ @Override
+ public void run() {
+ deleteExecutorDirs(executor.localDirs);
+ }
+ });
+ }
+ }
+ }
+ }
+
+ /**
+ * Synchronously deletes each directory one at a time.
+ * Should be executed in its own thread, as this may take a long time.
+ */
+ private void deleteExecutorDirs(String[] dirs) {
+ for (String localDir : dirs) {
+ try {
+ JavaUtils.deleteRecursively(new File(localDir));
+ logger.debug("Successfully cleaned up directory: " + localDir);
+ } catch (Exception e) {
+ logger.error("Failed to delete directory: " + localDir, e);
+ }
+ }
+ }
+
+ /**
+ * Hash-based shuffle data is simply stored as one file per block.
+ * This logic is from FileShuffleBlockResolver.
+ */
+ // TODO: Support consolidated hash shuffle files
+ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
+ File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
+ return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
+ }
+
+ /**
+ * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
+ * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
+ * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
+ */
+ private ManagedBuffer getSortBasedShuffleBlockData(
+ ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
+ File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
+ "shuffle_" + shuffleId + "_" + mapId + "_0.index");
+
+ DataInputStream in = null;
+ try {
+ in = new DataInputStream(new FileInputStream(indexFile));
+ in.skipBytes(reduceId * 8);
+ long offset = in.readLong();
+ long nextOffset = in.readLong();
+ return new FileSegmentManagedBuffer(
+ conf,
+ getFile(executor.localDirs, executor.subDirsPerLocalDir,
+ "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
+ offset,
+ nextOffset - offset);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to open file: " + indexFile, e);
+ } finally {
+ if (in != null) {
+ JavaUtils.closeQuietly(in);
+ }
+ }
+ }
+
+ /**
+ * Hashes a filename into the corresponding local directory, in a manner consistent with
+ * Spark's DiskBlockManager.getFile().
+ */
+ @VisibleForTesting
+ static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
+ int hash = JavaUtils.nonNegativeHash(filename);
+ String localDir = localDirs[hash % localDirs.length];
+ int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
+ return new File(new File(localDir, String.format("%02x", subDirId)), filename);
+ }
+
+ /** Simply encodes an executor's full ID, which is appId + execId. */
+ private static class AppExecId {
+ final String appId;
+ final String execId;
+
+ private AppExecId(String appId, String execId) {
+ this.appId = appId;
+ this.execId = execId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AppExecId appExecId = (AppExecId) o;
+ return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, execId);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("execId", execId)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 3f9fe16..73374cd 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);
OneForOneStreamManager streamManager;
- ExternalShuffleBlockManager blockManager;
+ ExternalShuffleBlockResolver blockResolver;
RpcHandler handler;
@Before
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
- blockManager = mock(ExternalShuffleBlockManager.class);
- handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
+ blockResolver = mock(ExternalShuffleBlockResolver.class);
+ handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
}
@Test
@@ -62,7 +62,7 @@ public class ExternalShuffleBlockHandlerSuite {
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
handler.receive(client, registerMessage, callback);
- verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
+ verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
verify(callback, times(1)).onSuccess((byte[]) any());
verify(callback, never()).onFailure((Throwable) any());
@@ -75,12 +75,12 @@ public class ExternalShuffleBlockHandlerSuite {
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
- when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
handler.receive(client, openBlocks, callback);
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
verify(callback, times(1)).onSuccess(response.capture());
http://git-wip-us.apache.org/repos/asf/spark/blob/4b3bb0e4/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
deleted file mode 100644
index dad6428..0000000
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.shuffle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import com.google.common.io.CharStreams;
-import org.apache.spark.network.util.SystemPropertyConfigProvider;
-import org.apache.spark.network.util.TransportConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ExternalShuffleBlockManagerSuite {
- static String sortBlock0 = "Hello!";
- static String sortBlock1 = "World!";
-
- static String hashBlock0 = "Elementary";
- static String hashBlock1 = "Tabular";
-
- static TestShuffleDataContext dataContext;
-
- static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
-
- @BeforeClass
- public static void beforeAll() throws IOException {
- dataContext = new TestShuffleDataContext(2, 5);
-
- dataContext.create();
- // Write some sort and hash data.
- dataContext.insertSortShuffleData(0, 0,
- new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
- dataContext.insertHashShuffleData(1, 0,
- new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
- }
-
- @AfterClass
- public static void afterAll() {
- dataContext.cleanup();
- }
-
- @Test
- public void testBadRequests() {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- // Unregistered executor
- try {
- manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (RuntimeException e) {
- assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
- }
-
- // Invalid shuffle manager
- manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
- try {
- manager.getBlockData("app0", "exec2", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (UnsupportedOperationException e) {
- // pass
- }
-
- // Nonexistent shuffle block
- manager.registerExecutor("app0", "exec3",
- dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
- try {
- manager.getBlockData("app0", "exec3", "shuffle_1_1_0");
- fail("Should have failed");
- } catch (Exception e) {
- // pass
- }
- }
-
- @Test
- public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
- dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
-
- InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
- block0Stream.close();
- assertEquals(sortBlock0, block0);
-
- InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
- block1Stream.close();
- assertEquals(sortBlock1, block1);
- }
-
- @Test
- public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
- dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
-
- InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
- String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
- block0Stream.close();
- assertEquals(hashBlock0, block0);
-
- InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
- String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
- block1Stream.close();
- assertEquals(hashBlock1, block1);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org