You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/05 02:54:15 UTC

[01/12] git commit: Memory-optimized shuffle file consolidation

Updated Branches:
  refs/heads/master b5dc3393a -> 7a26104ab


Memory-optimized shuffle file consolidation

Overhead of each shuffle block for consolidation has been reduced from >300 bytes
to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks,
net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime
of the shuffle phase in this test was only around 2% slower, while the reduce phase
was 40% faster, when compared to not using any shuffle file consolidation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/84991a1b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/84991a1b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/84991a1b

Branch: refs/heads/master
Commit: 84991a1b91cf1b3d3e51b984877016ff4a506cfc
Parents: b5dc339
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Oct 31 15:13:37 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:13 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala |  10 +-
 .../spark/storage/BlockObjectWriter.scala       |  15 +-
 .../apache/spark/storage/DiskBlockManager.scala |  54 +----
 .../org/apache/spark/storage/DiskStore.scala    |   4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 212 +++++++++++++++++--
 .../org/apache/spark/util/MetadataCleaner.scala |   2 +-
 .../org/apache/spark/util/PrimitiveVector.scala |  48 +++++
 .../spark/storage/DiskBlockManagerSuite.scala   |  80 +++++++
 8 files changed, 348 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 76d537f..fbedfbc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{InputStream, OutputStream}
+import java.io.{File, InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
 import scala.collection.mutable.{HashMap, ArrayBuffer}
@@ -47,7 +47,7 @@ private[spark] class BlockManager(
   extends Logging {
 
   val shuffleBlockManager = new ShuffleBlockManager(this)
-  val diskBlockManager = new DiskBlockManager(
+  val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
     System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
 
   private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
@@ -462,15 +462,11 @@ private[spark] class BlockManager(
    * This is currently used for writing shuffle files out. Callers should handle error
    * cases.
    */
-  def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int)
+  def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending =  true)
     val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
     writer.registerCloseEventHandler(() => {
-      if (shuffleBlockManager.consolidateShuffleFiles) {
-        diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
-      }
       val myInfo = new ShuffleBlockInfo()
       blockInfo.put(blockId, myInfo)
       myInfo.markReady(writer.fileSegment().length)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 32d2dd0..e49c191 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -78,11 +78,11 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
 
 /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
 class DiskBlockObjectWriter(
-                             blockId: BlockId,
-                             file: File,
-                             serializer: Serializer,
-                             bufferSize: Int,
-                             compressStream: OutputStream => OutputStream)
+    blockId: BlockId,
+    file: File,
+    serializer: Serializer,
+    bufferSize: Int,
+    compressStream: OutputStream => OutputStream)
   extends BlockObjectWriter(blockId)
   with Logging
 {
@@ -111,8 +111,8 @@ class DiskBlockObjectWriter(
   private var fos: FileOutputStream = null
   private var ts: TimeTrackingOutputStream = null
   private var objOut: SerializationStream = null
-  private var initialPosition = 0L
-  private var lastValidPosition = 0L
+  private val initialPosition = file.length()
+  private var lastValidPosition = initialPosition
   private var initialized = false
   private var _timeWriting = 0L
 
@@ -120,7 +120,6 @@ class DiskBlockObjectWriter(
     fos = new FileOutputStream(file, true)
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
-    initialPosition = channel.position
     lastValidPosition = initialPosition
     bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
     objOut = serializer.newInstance().serializeStream(bs)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index bcb58ad..4f9537d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -20,12 +20,11 @@ package org.apache.spark.storage
 import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Date, Random}
-import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.network.netty.{PathResolver, ShuffleSender}
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
+import org.apache.spark.util.Utils
 
 /**
  * Creates and maintains the logical mapping between logical blocks and physical on-disk
@@ -35,7 +34,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
  *
  * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
  */
-private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver with Logging {
+private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
@@ -47,54 +46,25 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
   private var shuffleSender : ShuffleSender = null
 
-  // Stores only Blocks which have been specifically mapped to segments of files
-  // (rather than the default, which maps a Block to a whole file).
-  // This keeps our bookkeeping down, since the file system itself tracks the standalone Blocks. 
-  private val blockToFileSegmentMap = new TimeStampedHashMap[BlockId, FileSegment]
-
-  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DISK_BLOCK_MANAGER, this.cleanup)
-
   addShutdownHook()
 
   /**
-   * Creates a logical mapping from the given BlockId to a segment of a file.
-   * This will cause any accesses of the logical BlockId to be directed to the specified
-   * physical location.
-   */
-  def mapBlockToFileSegment(blockId: BlockId, fileSegment: FileSegment) {
-    blockToFileSegmentMap.put(blockId, fileSegment)
-  }
-
-  /**
    * Returns the phyiscal file segment in which the given BlockId is located.
    * If the BlockId has been mapped to a specific FileSegment, that will be returned.
    * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
    */
   def getBlockLocation(blockId: BlockId): FileSegment = {
-    if (blockToFileSegmentMap.internalMap.containsKey(blockId)) {
-      blockToFileSegmentMap.get(blockId).get
-    } else {
-      val file = getFile(blockId.name)
-      new FileSegment(file, 0, file.length())
+    if (blockId.isShuffle) {
+      val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
+      if (segment.isDefined) { return segment.get }
+      // If no special mapping found, assume standard block -> file mapping...
     }
-  }
 
-  /**
-   * Simply returns a File to place the given Block into. This does not physically create the file.
-   * If filename is given, that file will be used. Otherwise, we will use the BlockId to get
-   * a unique filename.
-   */
-  def createBlockFile(blockId: BlockId, filename: String = "", allowAppending: Boolean): File = {
-    val actualFilename = if (filename == "") blockId.name else filename
-    val file = getFile(actualFilename)
-    if (!allowAppending && file.exists()) {
-      throw new IllegalStateException(
-        "Attempted to create file that already exists: " + actualFilename)
-    }
-    file
+    val file = getFile(blockId.name)
+    new FileSegment(file, 0, file.length())
   }
 
-  private def getFile(filename: String): File = {
+  def getFile(filename: String): File = {
     // Figure out which local directory it hashes to, and which subdirectory in that
     val hash = Utils.nonNegativeHash(filename)
     val dirId = hash % localDirs.length
@@ -119,6 +89,8 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
     new File(subDir, filename)
   }
 
+  def getFile(blockId: BlockId): File = getFile(blockId.name)
+
   private def createLocalDirs(): Array[File] = {
     logDebug("Creating local directories at root dirs '" + rootDirs + "'")
     val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
@@ -151,10 +123,6 @@ private[spark] class DiskBlockManager(rootDirs: String) extends PathResolver wit
     }
   }
 
-  private def cleanup(cleanupTime: Long) {
-    blockToFileSegmentMap.clearOldValues(cleanupTime)
-  }
-
   private def addShutdownHook() {
     localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index a3c496f..5a1e7b4 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -44,7 +44,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
     val bytes = _bytes.duplicate()
     logDebug("Attempting to put block " + blockId)
     val startTime = System.currentTimeMillis
-    val file = diskManager.createBlockFile(blockId, allowAppending = false)
+    val file = diskManager.getFile(blockId)
     val channel = new FileOutputStream(file).getChannel()
     while (bytes.remaining > 0) {
       channel.write(bytes)
@@ -64,7 +64,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
 
     logDebug("Attempting to write values for block " + blockId)
     val startTime = System.currentTimeMillis
-    val file = diskManager.createBlockFile(blockId, allowAppending = false)
+    val file = diskManager.getFile(blockId)
     val outputStream = new FileOutputStream(file)
     blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
     val length = file.length

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 066e45a..c61febf 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -17,17 +17,29 @@
 
 package org.apache.spark.storage
 
+import java.io.File
+import java.util
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, AGodDamnPrimitiveVector, TimeStampedHashMap}
 
 private[spark]
-class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
+class ShuffleWriterGroup(
+   val mapId: Int,
+   val fileGroup: ShuffleFileGroup,
+   val writers: Array[BlockObjectWriter])
 
 private[spark]
 trait ShuffleBlocks {
+  /** Get a group of writers for this map task. */
   def acquireWriters(mapId: Int): ShuffleWriterGroup
+
   def releaseWriters(group: ShuffleWriterGroup)
 }
 
@@ -46,51 +58,219 @@ trait ShuffleBlocks {
  *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
  */
 private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
     System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
 
-  var nextFileId = new AtomicInteger(0)
-  val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
+  /**
+   * Contains a pool of unused ShuffleFileGroups.
+   * One group is needed per concurrent thread (mapper) operating on the same shuffle.
+   */
+  private class ShuffleFileGroupPool {
+    private val nextFileId = new AtomicInteger(0)
+    private val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+    def getNextFileId() = nextFileId.getAndIncrement()
+    def getUnusedFileGroup() = unusedFileGroups.poll()
+    def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group)
+    def returnFileGroups(groups: Seq[ShuffleFileGroup]) = unusedFileGroups.addAll(groups)
+  }
+
+  type ShuffleId = Int
+  private val shuffleToFileGroupPoolMap = new TimeStampedHashMap[ShuffleId, ShuffleFileGroupPool]
+
+  /**
+   * Maps reducers (of a particular shuffle) to the set of files that have blocks destined for them.
+   * Each reducer will have one ShuffleFile per concurrent thread that executed during mapping.
+   */
+  private val shuffleToReducerToFilesMap =
+    new TimeStampedHashMap[ShuffleId, Array[ConcurrentLinkedQueue[ShuffleFile]]]
+
+  private
+  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
   def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = {
+    initializeShuffleMetadata(shuffleId, numBuckets)
+
     new ShuffleBlocks {
-      // Get a group of writers for a map task.
       override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
         val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
-        val fileId = getUnusedFileId()
+        val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
         val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
           if (consolidateShuffleFiles) {
-            val filename = physicalFileName(shuffleId, bucketId, fileId)
-            blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
+            blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize)
           } else {
-            blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
+            val blockFile = blockManager.diskBlockManager.getFile(blockId)
+            blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
           }
         }
-        new ShuffleWriterGroup(mapId, fileId, writers)
+        new ShuffleWriterGroup(mapId, fileGroup, writers)
       }
 
       override def releaseWriters(group: ShuffleWriterGroup) {
-        recycleFileId(group.fileId)
+        if (consolidateShuffleFiles) {
+          val fileGroup = group.fileGroup
+          fileGroup.addMapper(group.mapId)
+          for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
+            shuffleFile.recordMapOutput(writer.fileSegment().offset)
+          }
+          recycleFileGroup(shuffleId, fileGroup)
+        }
+      }
+    }
+  }
+
+  def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
+    val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool())
+    if (prev == None) {
+      val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
+      for (reducerId <- 0 until numBuckets) {
+        reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
       }
+      shuffleToReducerToFilesMap.put(shuffleId, reducerToFilesMap)
     }
   }
 
-  private def getUnusedFileId(): Int = {
-    val fileId = unusedFileIds.poll()
-    if (fileId == null) nextFileId.getAndIncrement() else fileId
+  private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
+    if (!consolidateShuffleFiles) { return null }
+
+    val pool = shuffleToFileGroupPoolMap(shuffleId)
+    var fileGroup = pool.getUnusedFileGroup()
+
+    // If we reuse a file group, ensure we maintain mapId monotonicity.
+    val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
+    while (fileGroup != null && fileGroup.maxMapId >= mapId) {
+      fileGroupsToReturn += fileGroup
+      fileGroup = pool.getUnusedFileGroup()
+    }
+    pool.returnFileGroups(fileGroupsToReturn) // re-add incompatible file groups
+
+    if (fileGroup == null) {
+      val fileId = pool.getNextFileId()
+      val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId =>
+        val filename = physicalFileName(shuffleId, bucketId, fileId)
+        val file = blockManager.diskBlockManager.getFile(filename)
+        val shuffleFile = new ShuffleFile(file)
+        shuffleToReducerToFilesMap(shuffleId)(bucketId).add(shuffleFile)
+        shuffleFile
+      }
+      new ShuffleFileGroup(shuffleId, fileId, files)
+    } else {
+      fileGroup
+    }
   }
 
-  private def recycleFileId(fileId: Int) {
+  private def recycleFileGroup(shuffleId: Int, fileGroup: ShuffleFileGroup) {
+    shuffleToFileGroupPoolMap(shuffleId).returnFileGroup(fileGroup)
+  }
+
+  /**
+   * Returns the physical file segment in which the given BlockId is located.
+   * If we have no special mapping, None will be returned.
+   */
+  def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = {
+    // Search all files associated with the given reducer.
+    // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
     if (consolidateShuffleFiles) {
-      unusedFileIds.add(fileId)
+      val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
+      for (file <- filesForReducer) {
+        val segment = file.getFileSegmentFor(id.mapId)
+        if (segment != None) { return segment }
+      }
+
+      logInfo("Failed to find shuffle block: " + id)
     }
+    None
   }
 
   private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
     "merged_shuffle_%d_%d_%d".format(shuffleId, bucketId, fileId)
   }
+
+  private def cleanup(cleanupTime: Long) {
+    shuffleToFileGroupPoolMap.clearOldValues(cleanupTime)
+    shuffleToReducerToFilesMap.clearOldValues(cleanupTime)
+  }
+}
+
+/**
+ * A group of shuffle files, one per reducer.
+ * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
+ * Mappers must be added in monotonically increasing order by id for efficiency purposes.
+ */
+private[spark]
+class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
+  private val mapIds = new AGodDamnPrimitiveVector[Int]()
+
+  files.foreach(_.setShuffleFileGroup(this))
+
+  /** The maximum map id (i.e., last added map task) in this file group. */
+  def maxMapId = if (mapIds.length > 0) mapIds(mapIds.length - 1) else -1
+
+  def apply(bucketId: Int) = files(bucketId)
+
+  def addMapper(mapId: Int) {
+    assert(mapId > maxMapId, "Attempted to insert mapId out-of-order")
+    mapIds += mapId
+  }
+
+  /**
+   * Uses binary search, giving O(log n) runtime.
+   * NB: Could be improved to amortized O(1) for usual access pattern, where nodes are accessed
+   * in order of monotonically increasing mapId. That approach is more fragile in general, however.
+   */
+  def indexOf(mapId: Int): Int = {
+    val index = util.Arrays.binarySearch(mapIds.getUnderlyingArray, 0, mapIds.length, mapId)
+    if (index >= 0) index else -1
+  }
+}
+
+/**
+ * A single, consolidated shuffle file that may contain many actual blocks. All blocks are destined
+ * to the same reducer.
+ */
+private[spark]
+class ShuffleFile(val file: File) {
+  /**
+   * Consecutive offsets of blocks into the file, ordered by position in the file.
+   * This ordering allows us to compute block lengths by examining the following block offset.
+   */
+  val blockOffsets = new AGodDamnPrimitiveVector[Long]()
+
+  /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
+  private var shuffleFileGroup : ShuffleFileGroup = _
+
+  // Required due to circular dependency between ShuffleFileGroup and ShuffleFile.
+  def setShuffleFileGroup(group: ShuffleFileGroup) {
+    assert(shuffleFileGroup == null)
+    shuffleFileGroup = group
+  }
+
+  def recordMapOutput(offset: Long) {
+    blockOffsets += offset
+  }
+
+  /**
+   * Returns the FileSegment associated with the given map task, or
+   * None if this ShuffleFile does not have an entry for it.
+   */
+  def getFileSegmentFor(mapId: Int): Option[FileSegment] = {
+    val index = shuffleFileGroup.indexOf(mapId)
+    if (index >= 0) {
+      val offset = blockOffsets(index)
+      val length =
+        if (index + 1 < blockOffsets.length) {
+          blockOffsets(index + 1) - offset
+        } else {
+          file.length() - offset
+        }
+      assert(length >= 0)
+      return Some(new FileSegment(file, offset, length))
+    } else {
+      None
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 3f96372..67a7f87 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -59,7 +59,7 @@ object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext
   "ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
 
   val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
-    SHUFFLE_MAP_TASK, BLOCK_MANAGER, DISK_BLOCK_MANAGER, BROADCAST_VARS = Value
+    SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
 
   type MetadataCleanerType = Value
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
new file mode 100644
index 0000000..d316601
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+class AGodDamnPrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest]
+    (initialSize: Int = 64)
+{
+  private var numElements = 0
+  private var array = new Array[V](initialSize)
+
+  def apply(index: Int): V = {
+    require(index < numElements)
+    array(index)
+  }
+
+  def +=(value: V) {
+    if (numElements == array.length) { resize(array.length * 2) }
+    array(numElements) = value
+    numElements += 1
+  }
+
+  def length = numElements
+
+  def getUnderlyingArray = array
+
+  /** Resizes the array, dropping elements if the total length decreases. */
+  def resize(newLength: Int) {
+    val newArray = new Array[V](newLength)
+    array.copyToArray(newArray)
+    array = newArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/84991a1b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
new file mode 100644
index 0000000..12ca920
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -0,0 +1,80 @@
+package org.apache.spark.storage
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+import java.io.{FileWriter, File}
+import java.nio.file.Files
+import scala.collection.mutable
+
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+
+  val rootDir0 = Files.createTempDirectory("disk-block-manager-suite-0")
+  val rootDir1 = Files.createTempDirectory("disk-block-manager-suite-1")
+  val rootDirs = rootDir0.getFileName + "," + rootDir1.getFileName
+  println("Created root dirs: " + rootDirs)
+
+  val shuffleBlockManager = new ShuffleBlockManager(null) {
+    var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
+    override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap.get(id)
+  }
+
+  var diskBlockManager: DiskBlockManager = _
+
+  override def beforeEach() {
+    diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
+    shuffleBlockManager.idToSegmentMap.clear()
+  }
+
+  test("basic block creation") {
+    val blockId = new TestBlockId("test")
+    assertSegmentEquals(blockId, blockId.name, 0, 0)
+
+    val newFile = diskBlockManager.getFile(blockId)
+    writeToFile(newFile, 10)
+    assertSegmentEquals(blockId, blockId.name, 0, 10)
+
+    newFile.delete()
+  }
+
+  test("block appending") {
+    val blockId = new TestBlockId("test")
+    val newFile = diskBlockManager.getFile(blockId)
+    writeToFile(newFile, 15)
+    assertSegmentEquals(blockId, blockId.name, 0, 15)
+    val newFile2 = diskBlockManager.getFile(blockId)
+    assert(newFile === newFile2)
+    writeToFile(newFile2, 12)
+    assertSegmentEquals(blockId, blockId.name, 0, 27)
+    newFile.delete()
+  }
+
+  test("block remapping") {
+    val filename = "test"
+    val blockId0 = new ShuffleBlockId(1, 2, 3)
+    val newFile = diskBlockManager.getFile(filename)
+    writeToFile(newFile, 15)
+    shuffleBlockManager.idToSegmentMap(blockId0) = new FileSegment(newFile, 0, 15)
+    assertSegmentEquals(blockId0, filename, 0, 15)
+
+    val blockId1 = new ShuffleBlockId(1, 2, 4)
+    val newFile2 = diskBlockManager.getFile(filename)
+    writeToFile(newFile2, 12)
+    shuffleBlockManager.idToSegmentMap(blockId1) = new FileSegment(newFile, 15, 12)
+    assertSegmentEquals(blockId1, filename, 15, 12)
+
+    assert(newFile === newFile2)
+    newFile.delete()
+  }
+
+  def assertSegmentEquals(blockId: BlockId, filename: String, offset: Int, length: Int) {
+    val segment = diskBlockManager.getBlockLocation(blockId)
+    assert(segment.file.getName === filename)
+    assert(segment.offset === offset)
+    assert(segment.length === length)
+  }
+
+  def writeToFile(file: File, numBytes: Int) {
+    val writer = new FileWriter(file, true)
+    for (i <- 0 until numBytes) writer.write(i)
+    writer.close()
+  }
+}


[08/12] git commit: Clean up test files properly

Posted by rx...@apache.org.
Clean up test files properly

For some reason, even calling
java.nio.Files.createTempDirectory().getFile.deleteOnExit()
does not delete the directory on exit. Guava's analagous function
seems to work, however.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/39d93ed4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/39d93ed4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/39d93ed4

Branch: refs/heads/master
Commit: 39d93ed4b90b8b302a978df878fd020e7d1fcf56
Parents: a0bb569
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sun Nov 3 21:52:59 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:52:59 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockManagerSuite.scala  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/39d93ed4/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 89a7c6e..0b90563 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -1,15 +1,19 @@
 package org.apache.spark.storage
 
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
 import java.io.{FileWriter, File}
-import java.nio.file.Files
+
 import scala.collection.mutable
 
+import com.google.common.io.Files
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
 
-  val rootDir0 = Files.createTempDirectory("disk-block-manager-suite-0")
-  val rootDir1 = Files.createTempDirectory("disk-block-manager-suite-1")
-  val rootDirs = rootDir0.getFileName + "," + rootDir1.getFileName
+  val rootDir0 = Files.createTempDir()
+  rootDir0.deleteOnExit()
+  val rootDir1 = Files.createTempDir()
+  rootDir1.deleteOnExit()
+  val rootDirs = rootDir0.getName + "," + rootDir1.getName
   println("Created root dirs: " + rootDirs)
 
   val shuffleBlockManager = new ShuffleBlockManager(null) {


[03/12] git commit: Address minor comments

Posted by rx...@apache.org.
Address minor comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7453f311
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7453f311
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7453f311

Branch: refs/heads/master
Commit: 7453f31181a173f1cacb2c957455bf05e52f43c2
Parents: 84991a1
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Nov 1 21:01:04 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:43 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockManager.scala |  3 +-
 .../spark/storage/ShuffleBlockManager.scala     |  7 +--
 .../org/apache/spark/util/PrimitiveVector.scala | 48 --------------------
 .../spark/util/collection/PrimitiveVector.scala | 47 +++++++++++++++++++
 4 files changed, 53 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7453f311/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4f9537d..bde3d1f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -34,7 +34,8 @@ import org.apache.spark.util.Utils
  *
  * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
  */
-private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String) extends PathResolver with Logging {
+private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootDirs: String)
+  extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   private val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7453f311/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index c61febf..d718c87 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -27,7 +27,8 @@ import scala.collection.mutable
 
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, AGodDamnPrimitiveVector, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.collection.PrimitiveVector
 
 private[spark]
 class ShuffleWriterGroup(
@@ -203,7 +204,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
-  private val mapIds = new AGodDamnPrimitiveVector[Int]()
+  private val mapIds = new PrimitiveVector[Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
 
@@ -238,7 +239,7 @@ class ShuffleFile(val file: File) {
    * Consecutive offsets of blocks into the file, ordered by position in the file.
    * This ordering allows us to compute block lengths by examining the following block offset.
    */
-  val blockOffsets = new AGodDamnPrimitiveVector[Long]()
+  val blockOffsets = new PrimitiveVector[Long]()
 
   /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
   private var shuffleFileGroup : ShuffleFileGroup = _

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7453f311/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
deleted file mode 100644
index d316601..0000000
--- a/core/src/main/scala/org/apache/spark/util/PrimitiveVector.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
-class AGodDamnPrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest]
-    (initialSize: Int = 64)
-{
-  private var numElements = 0
-  private var array = new Array[V](initialSize)
-
-  def apply(index: Int): V = {
-    require(index < numElements)
-    array(index)
-  }
-
-  def +=(value: V) {
-    if (numElements == array.length) { resize(array.length * 2) }
-    array(numElements) = value
-    numElements += 1
-  }
-
-  def length = numElements
-
-  def getUnderlyingArray = array
-
-  /** Resizes the array, dropping elements if the total length decreases. */
-  def resize(newLength: Int) {
-    val newArray = new Array[V](newLength)
-    array.copyToArray(newArray)
-    array = newArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7453f311/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
new file mode 100644
index 0000000..721f12b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+private[spark]
+class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
+  private var numElements = 0
+  private var array = new Array[V](initialSize)
+
+  def apply(index: Int): V = {
+    require(index < numElements)
+    array(index)
+  }
+
+  def +=(value: V) {
+    if (numElements == array.length) { resize(array.length * 2) }
+    array(numElements) = value
+    numElements += 1
+  }
+
+  def length = numElements
+
+  def getUnderlyingArray = array
+
+  /** Resizes the array, dropping elements if the total length decreases. */
+  def resize(newLength: Int) {
+    val newArray = new Array[V](newLength)
+    array.copyToArray(newArray)
+    array = newArray
+  }
+}


[05/12] git commit: Fix test breakage

Posted by rx...@apache.org.
Fix test breakage


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3ca52309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3ca52309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3ca52309

Branch: refs/heads/master
Commit: 3ca52309f2af07e6dbca56017360a0a814b8f9ca
Parents: 1592adf
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sat Nov 2 00:29:31 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:44 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3ca52309/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 12ca920..89a7c6e 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -14,7 +14,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
 
   val shuffleBlockManager = new ShuffleBlockManager(null) {
     var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
-    override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap.get(id)
+    override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
   }
 
   var diskBlockManager: DiskBlockManager = _


[04/12] git commit: Address Reynold's comments

Posted by rx...@apache.org.
Address Reynold's comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8703898d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8703898d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8703898d

Branch: refs/heads/master
Commit: 8703898d3f2c6b6e08b3ef91da67876589aba184
Parents: 3ca5230
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sun Nov 3 00:34:53 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:44 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/ShuffleBlockManager.scala     | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8703898d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d1e3074..57b1a28 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,7 +45,7 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
  * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
@@ -57,11 +57,13 @@ trait ShuffleBlocks {
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
+ * that specifies where in a given file the actual block data is located.
  *
  * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block
- * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles
- * destined for the block's reducer.
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each
+ * block stored in that file. In order to find the location of a shuffle block, we search all
+ * ShuffleFiles destined for the block's reducer.
  *
  */
 private[spark]
@@ -98,18 +100,22 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   private
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
-  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) = {
+  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
     initializeShuffleMetadata(shuffleId, numBuckets)
 
     new ShuffleBlocks {
       override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
         val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
-        val fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
-        val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          if (consolidateShuffleFiles) {
+        var fileGroup: ShuffleFileGroup = null
+        val writers = if (consolidateShuffleFiles) {
+          fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize)
-          } else {
+          }
+        } else {
+          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
             val blockFile = blockManager.diskBlockManager.getFile(blockId)
             blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
           }
@@ -142,8 +148,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   }
 
   private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
-    if (!consolidateShuffleFiles) { return null }
-
     val pool = shuffleToFileGroupPoolMap(shuffleId)
     var fileGroup = pool.getUnusedFileGroup()
 


[11/12] git commit: Minor cleanup in ShuffleBlockManager

Posted by rx...@apache.org.
Minor cleanup in ShuffleBlockManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1ba11b1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1ba11b1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1ba11b1c

Branch: refs/heads/master
Commit: 1ba11b1c6aeda084cb158262ec0aa37a7b70fe32
Parents: 6201e5e
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon Nov 4 17:16:20 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Nov 4 17:16:41 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/ShuffleBlockManager.scala | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1ba11b1c/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 6346db3..2f1b049 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
 
-import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
 import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -59,7 +58,7 @@ private[spark] trait ShuffleWriterGroup {
  * files within a ShuffleFileGroups associated with the block's reducer.
  */
 private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
+class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
@@ -83,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   private
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
-  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) =
+  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
     new ShuffleWriterGroup {
       shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
       private val shuffleState = shuffleStates(shuffleId)
@@ -133,6 +132,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
         shuffleState.unusedFileGroups.add(group)
       }
     }
+  }
 
   /**
    * Returns the physical file segment in which the given BlockId is located.
@@ -177,7 +177,7 @@ object ShuffleBlockManager {
      * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
      * reducer.
      */
-    private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ =>
+    private val blockOffsetsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
       new PrimitiveVector[Long]()
     }
 


[12/12] git commit: Merge pull request #130 from aarondav/shuffle

Posted by rx...@apache.org.
Merge pull request #130 from aarondav/shuffle

Memory-optimized shuffle file consolidation

Reduces overhead of each shuffle block for consolidation from >300 bytes to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks, net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime of the shuffle phase in this test was only around 2% slower, while the reduce phase was 40% faster, when compared to not using any shuffle file consolidation.

This is accomplished by replacing the map from ShuffleBlockId to FileSegment (i.e., block id to where it's located), which had high overhead due to being a gigantic, timestamped, concurrent map with a more space-efficient structure. Namely, the following are introduced (I have omitted the word "Shuffle" from some names for clarity):
**ShuffleFile** - there is one ShuffleFile per consolidated shuffle file on disk. We store an array of offsets into the physical shuffle file for each ShuffleMapTask that wrote into the file. This is sufficient to reconstruct FileSegments for mappers that are in the file.
**FileGroup** - contains a set of ShuffleFiles, one per reducer, that a MapTask can use to write its output. There is one FileGroup created per _concurrent_ MapTask. The FileGroup contains an array of the mapIds that have been written to all files in the group. The positions of elements in this array map directly onto the positions in each ShuffleFile's offsets array.

In order to locate the FileSegment associated with a BlockId, we have another structure which maps each reducer to the set of ShuffleFiles that were created for it. (There will be as many ShuffleFiles per reducer as there are FileGroups.) To lookup a given ShuffleBlockId (shuffleId, reducerId, mapId), we thus search through all ShuffleFiles associated with that reducer.

As a time optimization, we ensure that FileGroups are only reused for MapTasks with monotonically increasing mapIds. This allows us to perform a binary search to locate a mapId inside a group, and also enables potential future optimization (based on the usual monotonic access order).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7a26104a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7a26104a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7a26104a

Branch: refs/heads/master
Commit: 7a26104ab7cb492b347ba761ef1f17ca1b9078e4
Parents: b5dc339 1ba11b1
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Nov 4 17:54:06 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Nov 4 17:54:06 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  23 +--
 .../org/apache/spark/storage/BlockManager.scala |  10 +-
 .../spark/storage/BlockObjectWriter.scala       |  15 +-
 .../apache/spark/storage/DiskBlockManager.scala |  49 +----
 .../org/apache/spark/storage/DiskStore.scala    |   4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 189 +++++++++++++++----
 .../spark/storage/StoragePerfTester.scala       |  10 +-
 .../org/apache/spark/util/MetadataCleaner.scala |   2 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |   6 +
 .../spark/util/collection/PrimitiveVector.scala |  51 +++++
 .../spark/storage/DiskBlockManagerSuite.scala   |  84 +++++++++
 11 files changed, 333 insertions(+), 110 deletions(-)
----------------------------------------------------------------------



[02/12] git commit: Fix weird bug with specialized PrimitiveVector

Posted by rx...@apache.org.
Fix weird bug with specialized PrimitiveVector


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7d44dec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7d44dec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7d44dec9

Branch: refs/heads/master
Commit: 7d44dec9bd7c4bbfb8daf4843a0968797e009bea
Parents: 7453f31
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Nov 1 21:04:09 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:43 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/collection/PrimitiveVector.scala     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7d44dec9/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 721f12b..369519c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -21,7 +21,11 @@ package org.apache.spark.util.collection
 private[spark]
 class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
   private var numElements = 0
-  private var array = new Array[V](initialSize)
+  private var array: Array[V] = _
+
+  // NB: This must be separate from the declaration, otherwise the specialized parent class
+  // will get its own array with the same initial size. TODO: Figure out why...
+  array = new Array[V](initialSize)
 
   def apply(index: Int): V = {
     require(index < numElements)


[09/12] git commit: Add javadoc and remove unused code

Posted by rx...@apache.org.
Add javadoc and remove unused code


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b0cf19fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b0cf19fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b0cf19fe

Branch: refs/heads/master
Commit: b0cf19fe3c395e84e730e97ec211d7ef935951e1
Parents: 39d93ed
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sun Nov 3 22:16:58 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 22:16:58 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/storage/ShuffleBlockManager.scala   | 1 -
 .../org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala  | 1 +
 2 files changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0cf19fe/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 8b202ac..a3bb425 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -85,7 +85,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
     def getNextFileId() = nextFileId.getAndIncrement()
     def getUnusedFileGroup() = unusedFileGroups.poll()
     def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group)
-    def returnFileGroups(groups: Seq[ShuffleFileGroup]) = unusedFileGroups.addAll(groups)
   }
 
   type ShuffleId = Int

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b0cf19fe/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index a119880..d76143e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -53,6 +53,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
     _values(pos)
   }
 
+  /** Get the value for a given key, or returns elseValue if it doesn't exist. */
   def getOrElse(k: K, elseValue: V): V = {
     val pos = _keySet.getPos(k)
     if (pos >= 0) _values(pos) else elseValue


[07/12] git commit: use OpenHashMap, remove monotonicity requirement, fix failure bug

Posted by rx...@apache.org.
use OpenHashMap, remove monotonicity requirement, fix failure bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a0bb569a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a0bb569a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a0bb569a

Branch: refs/heads/master
Commit: a0bb569a818f6ce66c192a3f5782ff56cf58b1d3
Parents: 8703898
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sun Nov 3 20:45:11 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:56 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 56 ++++++--------------
 .../spark/storage/StoragePerfTester.scala       |  2 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |  5 ++
 4 files changed, 26 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 24d97da..c502f8f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -148,6 +148,7 @@ private[spark] class ShuffleMapTask(
     val blockManager = SparkEnv.get.blockManager
     var shuffle: ShuffleBlocks = null
     var buckets: ShuffleWriterGroup = null
+    var success = false
 
     try {
       // Obtain all the block writers for shuffle blocks.
@@ -179,6 +180,7 @@ private[spark] class ShuffleMapTask(
       shuffleMetrics.shuffleWriteTime = totalTime
       metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
 
+      success = true
       new MapStatus(blockManager.blockManagerId, compressedSizes)
     } catch { case e: Exception =>
       // If there is an exception from running the task, revert the partial writes
@@ -191,7 +193,7 @@ private[spark] class ShuffleMapTask(
       // Release the writers back to the shuffle block manager.
       if (shuffle != null && buckets != null) {
         buckets.writers.foreach(_.close())
-        shuffle.releaseWriters(buckets)
+        shuffle.releaseWriters(buckets, success)
       }
       // Execute the callbacks on task completion.
       context.executeOnCompleteCallbacks()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 57b1a28..8b202ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
 private[spark]
 class ShuffleWriterGroup(
@@ -41,7 +41,8 @@ trait ShuffleBlocks {
   /** Get a group of writers for this map task. */
   def acquireWriters(mapId: Int): ShuffleWriterGroup
 
-  def releaseWriters(group: ShuffleWriterGroup)
+  /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
+  def releaseWriters(group: ShuffleWriterGroup, success: Boolean)
 }
 
 /**
@@ -123,12 +124,14 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
         new ShuffleWriterGroup(mapId, fileGroup, writers)
       }
 
-      override def releaseWriters(group: ShuffleWriterGroup) {
+      override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) {
         if (consolidateShuffleFiles) {
           val fileGroup = group.fileGroup
-          fileGroup.addMapper(group.mapId)
-          for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
-            shuffleFile.recordMapOutput(writer.fileSegment().offset)
+          if (success) {
+            fileGroup.addMapper(group.mapId)
+            for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
+              shuffleFile.recordMapOutput(writer.fileSegment().offset)
+            }
           }
           recycleFileGroup(shuffleId, fileGroup)
         }
@@ -149,18 +152,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 
   private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
     val pool = shuffleToFileGroupPoolMap(shuffleId)
-    var fileGroup = pool.getUnusedFileGroup()
-
-    // If we reuse a file group, ensure we maintain mapId monotonicity.
-    // This means we may create extra ShuffleFileGroups if we're trying to run a map task
-    // that is out-of-order with respect to its mapId (which may happen when failures occur).
-    val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
-    while (fileGroup != null && fileGroup.maxMapId >= mapId) {
-      fileGroupsToReturn += fileGroup
-      fileGroup = pool.getUnusedFileGroup()
-    }
-    pool.returnFileGroups(fileGroupsToReturn) // re-add incompatible file groups
-
+    val fileGroup = pool.getUnusedFileGroup()
     if (fileGroup == null) {
       val fileId = pool.getNextFileId()
       val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId =>
@@ -187,7 +179,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
    */
   def getBlockLocation(id: ShuffleBlockId): FileSegment = {
     // Search all files associated with the given reducer.
-    // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
     val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
     for (file <- filesForReducer) {
       val segment = file.getFileSegmentFor(id.mapId)
@@ -210,37 +201,24 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 /**
  * A group of shuffle files, one per reducer.
  * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
- * Mappers must be added in monotonically increasing order by id for efficiency purposes.
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
   /**
-   * Contains the set of mappers that have written to this file group, in the same order as they
-   * have written to their respective files.
+   * Stores the absolute index of each mapId in the files of this group. For instance,
+   * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
    */
-  private val mapIds = new PrimitiveVector[Int]()
+  private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
 
-  /** The maximum map id (i.e., last added map task) in this file group. */
-  def maxMapId = if (mapIds.length > 0) mapIds(mapIds.length - 1) else -1
-
   def apply(bucketId: Int) = files(bucketId)
 
   def addMapper(mapId: Int) {
-    assert(mapId > maxMapId, "Attempted to insert mapId out-of-order")
-    mapIds += mapId
+    mapIdToIndex(mapId) = mapIdToIndex.size
   }
 
-  /**
-   * Uses binary search, giving O(log n) runtime.
-   * NB: Could be improved to amortized O(1) for usual access pattern, where nodes are accessed
-   * in order of monotonically increasing mapId. That approach is more fragile in general, however.
-   */
-  def indexOf(mapId: Int): Int = {
-    val index = util.Arrays.binarySearch(mapIds.getUnderlyingArray, 0, mapIds.length, mapId)
-    if (index >= 0) index else -1
-  }
+  def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1)
 }
 
 /**
@@ -252,7 +230,7 @@ class ShuffleFile(val file: File) {
   /**
    * Consecutive offsets of blocks into the file, ordered by position in the file.
    * This ordering allows us to compute block lengths by examining the following block offset.
-   * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i).
+   * Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array.
    */
   private val blockOffsets = new PrimitiveVector[Long]()
 
@@ -284,7 +262,7 @@ class ShuffleFile(val file: File) {
           file.length() - offset
         }
       assert(length >= 0)
-      return Some(new FileSegment(file, offset, length))
+      Some(new FileSegment(file, offset, length))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index 7dcadc3..021f6f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -50,7 +50,7 @@ object StoragePerfTester {
         w.close()
       }
 
-      shuffle.releaseWriters(buckets)
+      shuffle.releaseWriters(buckets, true)
     }
 
     val start = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 4adf9cf..a119880 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -53,6 +53,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
     _values(pos)
   }
 
+  def getOrElse(k: K, elseValue: V): V = {
+    val pos = _keySet.getPos(k)
+    if (pos >= 0) _values(pos) else elseValue
+  }
+
   /** Set the value for a key */
   def update(k: K, v: V) {
     val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK


[10/12] git commit: Refactor ShuffleBlockManager to reduce public interface

Posted by rx...@apache.org.
Refactor ShuffleBlockManager to reduce public interface

- ShuffleBlocks has been removed and replaced by ShuffleWriterGroup.
- ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup.
- ShuffleFile has been removed and its contents are now within ShuffleFileGroup.
- ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6201e5e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6201e5e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6201e5e2

Branch: refs/heads/master
Commit: 6201e5e2493b0f9addba57f60d6ddb88e572b858
Parents: b0cf19f
Author: Aaron Davidson <aa...@databricks.com>
Authored: Mon Nov 4 09:41:04 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Mon Nov 4 09:41:04 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  21 +-
 .../spark/storage/ShuffleBlockManager.scala     | 270 ++++++++-----------
 .../spark/storage/StoragePerfTester.scala       |  10 +-
 3 files changed, 123 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6201e5e2/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index c502f8f..1dc71a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -146,27 +146,26 @@ private[spark] class ShuffleMapTask(
     metrics = Some(context.taskMetrics)
 
     val blockManager = SparkEnv.get.blockManager
-    var shuffle: ShuffleBlocks = null
-    var buckets: ShuffleWriterGroup = null
+    val shuffleBlockManager = blockManager.shuffleBlockManager
+    var shuffle: ShuffleWriterGroup = null
     var success = false
 
     try {
       // Obtain all the block writers for shuffle blocks.
       val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
-      shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
-      buckets = shuffle.acquireWriters(partitionId)
+      shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
 
       // Write the map output to its associated buckets.
       for (elem <- rdd.iterator(split, context)) {
         val pair = elem.asInstanceOf[Product2[Any, Any]]
         val bucketId = dep.partitioner.getPartition(pair._1)
-        buckets.writers(bucketId).write(pair)
+        shuffle.writers(bucketId).write(pair)
       }
 
       // Commit the writes. Get the size of each bucket block (total block size).
       var totalBytes = 0L
       var totalTime = 0L
-      val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
+      val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
         writer.commit()
         val size = writer.fileSegment().length
         totalBytes += size
@@ -185,15 +184,15 @@ private[spark] class ShuffleMapTask(
     } catch { case e: Exception =>
       // If there is an exception from running the task, revert the partial writes
       // and throw the exception upstream to Spark.
-      if (buckets != null) {
-        buckets.writers.foreach(_.revertPartialWrites())
+      if (shuffle != null) {
+        shuffle.writers.foreach(_.revertPartialWrites())
       }
       throw e
     } finally {
       // Release the writers back to the shuffle block manager.
-      if (shuffle != null && buckets != null) {
-        buckets.writers.foreach(_.close())
-        shuffle.releaseWriters(buckets, success)
+      if (shuffle != null && shuffle.writers != null) {
+        shuffle.writers.foreach(_.close())
+        shuffle.releaseWriters(success)
       }
       // Execute the callbacks on task completion.
       context.executeOnCompleteCallbacks()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6201e5e2/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index a3bb425..6346db3 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -18,31 +18,23 @@
 package org.apache.spark.storage
 
 import java.io.File
-import java.util
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
 import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
+import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
 
-private[spark]
-class ShuffleWriterGroup(
-   val mapId: Int,
-   val fileGroup: ShuffleFileGroup,
-   val writers: Array[BlockObjectWriter])
-
-private[spark]
-trait ShuffleBlocks {
-  /** Get a group of writers for this map task. */
-  def acquireWriters(mapId: Int): ShuffleWriterGroup
+/** A group of writers for a ShuffleMapTask, one writer per reducer. */
+private[spark] trait ShuffleWriterGroup {
+  val writers: Array[BlockObjectWriter]
 
   /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
-  def releaseWriters(group: ShuffleWriterGroup, success: Boolean)
+  def releaseWriters(success: Boolean)
 }
 
 /**
@@ -50,9 +42,9 @@ trait ShuffleBlocks {
  * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per
- * reducer per concurrently executing shuffle task. As soon as a task finishes writing to its
- * shuffle files, it releases them for another task.
+ * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
+ * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
+ * files, it releases them for another task.
  * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
  *   - shuffleId: The unique id given to the entire shuffle stage.
  *   - bucketId: The id of the output partition (i.e., reducer id)
@@ -62,10 +54,9 @@ trait ShuffleBlocks {
  * that specifies where in a given file the actual block data is located.
  *
  * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
- * ShuffleBlockIds directly to FileSegments, each ShuffleFile maintains a list of offsets for each
- * block stored in that file. In order to find the location of a shuffle block, we search all
- * ShuffleFiles destined for the block's reducer.
- *
+ * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
+ * each block stored in each file. In order to find the location of a shuffle block, we search the
+ * files within a ShuffleFileGroups associated with the block's reducer.
  */
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
@@ -74,102 +65,74 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   val consolidateShuffleFiles =
     System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
 
+  private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+
   /**
-   * Contains a pool of unused ShuffleFileGroups.
-   * One group is needed per concurrent thread (mapper) operating on the same shuffle.
+   * Contains all the state related to a particular shuffle. This includes a pool of unused
+   * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
    */
-  private class ShuffleFileGroupPool {
-    private val nextFileId = new AtomicInteger(0)
-    private val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
-
-    def getNextFileId() = nextFileId.getAndIncrement()
-    def getUnusedFileGroup() = unusedFileGroups.poll()
-    def returnFileGroup(group: ShuffleFileGroup) = unusedFileGroups.add(group)
+  private class ShuffleState() {
+    val nextFileId = new AtomicInteger(0)
+    val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+    val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
   }
 
   type ShuffleId = Int
-  private val shuffleToFileGroupPoolMap = new TimeStampedHashMap[ShuffleId, ShuffleFileGroupPool]
-
-  /**
-   * Maps reducers (of a particular shuffle) to the set of files that have blocks destined for them.
-   * Each reducer will have one ShuffleFile per concurrent thread that executed during mapping.
-   */
-  private val shuffleToReducerToFilesMap =
-    new TimeStampedHashMap[ShuffleId, Array[ConcurrentLinkedQueue[ShuffleFile]]]
+  private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
 
   private
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
 
-  def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
-    initializeShuffleMetadata(shuffleId, numBuckets)
-
-    new ShuffleBlocks {
-      override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
-        val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
-        var fileGroup: ShuffleFileGroup = null
-        val writers = if (consolidateShuffleFiles) {
-          fileGroup = getUnusedFileGroup(shuffleId, mapId, numBuckets)
-          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-            blockManager.getDiskWriter(blockId, fileGroup(bucketId).file, serializer, bufferSize)
-          }
-        } else {
-          Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
-            val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-            val blockFile = blockManager.diskBlockManager.getFile(blockId)
-            blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
-          }
+  def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) =
+    new ShuffleWriterGroup {
+      shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
+      private val shuffleState = shuffleStates(shuffleId)
+      private var fileGroup: ShuffleFileGroup = null
+
+      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
+        fileGroup = getUnusedFileGroup()
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
+        }
+      } else {
+        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
+          val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
         }
-        new ShuffleWriterGroup(mapId, fileGroup, writers)
       }
 
-      override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) {
+      override def releaseWriters(success: Boolean) {
         if (consolidateShuffleFiles) {
-          val fileGroup = group.fileGroup
           if (success) {
-            fileGroup.addMapper(group.mapId)
-            for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
-              shuffleFile.recordMapOutput(writer.fileSegment().offset)
-            }
+            val offsets = writers.map(_.fileSegment().offset)
+            fileGroup.recordMapOutput(mapId, offsets)
           }
-          recycleFileGroup(shuffleId, fileGroup)
+          recycleFileGroup(fileGroup)
         }
       }
-    }
-  }
 
-  private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
-    val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool())
-    if (!prev.isDefined) {
-      val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
-      for (reducerId <- 0 until numBuckets) {
-        reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
+      private def getUnusedFileGroup(): ShuffleFileGroup = {
+        val fileGroup = shuffleState.unusedFileGroups.poll()
+        if (fileGroup != null) fileGroup else newFileGroup()
+      }
+
+      private def newFileGroup(): ShuffleFileGroup = {
+        val fileId = shuffleState.nextFileId.getAndIncrement()
+        val files = Array.tabulate[File](numBuckets) { bucketId =>
+          val filename = physicalFileName(shuffleId, bucketId, fileId)
+          blockManager.diskBlockManager.getFile(filename)
+        }
+        val fileGroup = new ShuffleFileGroup(fileId, shuffleId, files)
+        shuffleState.allFileGroups.add(fileGroup)
+        fileGroup
       }
-      shuffleToReducerToFilesMap.put(shuffleId, reducerToFilesMap)
-    }
-  }
 
-  private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
-    val pool = shuffleToFileGroupPoolMap(shuffleId)
-    val fileGroup = pool.getUnusedFileGroup()
-    if (fileGroup == null) {
-      val fileId = pool.getNextFileId()
-      val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId =>
-        val filename = physicalFileName(shuffleId, bucketId, fileId)
-        val file = blockManager.diskBlockManager.getFile(filename)
-        val shuffleFile = new ShuffleFile(file)
-        shuffleToReducerToFilesMap(shuffleId)(bucketId).add(shuffleFile)
-        shuffleFile
+      private def recycleFileGroup(group: ShuffleFileGroup) {
+        shuffleState.unusedFileGroups.add(group)
       }
-      new ShuffleFileGroup(shuffleId, fileId, files)
-    } else {
-      fileGroup
     }
-  }
-
-  private def recycleFileGroup(shuffleId: Int, fileGroup: ShuffleFileGroup) {
-    shuffleToFileGroupPoolMap(shuffleId).returnFileGroup(fileGroup)
-  }
 
   /**
    * Returns the physical file segment in which the given BlockId is located.
@@ -177,13 +140,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
    * an error condition if we don't find the expected block.
    */
   def getBlockLocation(id: ShuffleBlockId): FileSegment = {
-    // Search all files associated with the given reducer.
-    val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
-    for (file <- filesForReducer) {
-      val segment = file.getFileSegmentFor(id.mapId)
-      if (segment != None) { return segment.get }
+    // Search all file groups associated with this shuffle.
+    val shuffleState = shuffleStates(id.shuffleId)
+    for (fileGroup <- shuffleState.allFileGroups) {
+      val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
+      if (segment.isDefined) { return segment.get }
     }
-
     throw new IllegalStateException("Failed to find shuffle block: " + id)
   }
 
@@ -192,78 +154,62 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   }
 
   private def cleanup(cleanupTime: Long) {
-    shuffleToFileGroupPoolMap.clearOldValues(cleanupTime)
-    shuffleToReducerToFilesMap.clearOldValues(cleanupTime)
+    shuffleStates.clearOldValues(cleanupTime)
   }
 }
 
-/**
- * A group of shuffle files, one per reducer.
- * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
- */
-private[spark]
-class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
-  /**
-   * Stores the absolute index of each mapId in the files of this group. For instance,
-   * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
-   */
-  private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
-
-  files.foreach(_.setShuffleFileGroup(this))
-
-  def apply(bucketId: Int) = files(bucketId)
-
-  def addMapper(mapId: Int) {
-    mapIdToIndex(mapId) = mapIdToIndex.size
-  }
-
-  def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1)
-}
-
-/**
- * A single, consolidated shuffle file that may contain many actual blocks. All blocks are destined
- * to the same reducer.
- */
 private[spark]
-class ShuffleFile(val file: File) {
+object ShuffleBlockManager {
   /**
-   * Consecutive offsets of blocks into the file, ordered by position in the file.
-   * This ordering allows us to compute block lengths by examining the following block offset.
-   * Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array.
+   * A group of shuffle files, one per reducer.
+   * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
    */
-  private val blockOffsets = new PrimitiveVector[Long]()
+  private class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[File]) {
+    /**
+     * Stores the absolute index of each mapId in the files of this group. For instance,
+     * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
+     */
+    private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
+
+    /**
+     * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file.
+     * This ordering allows us to compute block lengths by examining the following block offset.
+     * Note: mapIdToIndex(mapId) returns the index of the mapper into the vector for every
+     * reducer.
+     */
+    private val blockOffsetsByReducer = Array.tabulate[PrimitiveVector[Long]](files.length) { _ =>
+      new PrimitiveVector[Long]()
+    }
 
-  /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
-  private var shuffleFileGroup : ShuffleFileGroup = _
+    def numBlocks = mapIdToIndex.size
 
-  // Required due to circular dependency between ShuffleFileGroup and ShuffleFile.
-  def setShuffleFileGroup(group: ShuffleFileGroup) {
-    assert(shuffleFileGroup == null)
-    shuffleFileGroup = group
-  }
+    def apply(bucketId: Int) = files(bucketId)
 
-  def recordMapOutput(offset: Long) {
-    blockOffsets += offset
-  }
+    def recordMapOutput(mapId: Int, offsets: Array[Long]) {
+      mapIdToIndex(mapId) = numBlocks
+      for (i <- 0 until offsets.length) {
+        blockOffsetsByReducer(i) += offsets(i)
+      }
+    }
 
-  /**
-   * Returns the FileSegment associated with the given map task, or
-   * None if this ShuffleFile does not have an entry for it.
-   */
-  def getFileSegmentFor(mapId: Int): Option[FileSegment] = {
-    val index = shuffleFileGroup.indexOf(mapId)
-    if (index >= 0) {
-      val offset = blockOffsets(index)
-      val length =
-        if (index + 1 < blockOffsets.length) {
-          blockOffsets(index + 1) - offset
-        } else {
-          file.length() - offset
-        }
-      assert(length >= 0)
-      Some(new FileSegment(file, offset, length))
-    } else {
-      None
+    /** Returns the FileSegment associated with the given map task, or None if no entry exists. */
+    def getFileSegmentFor(mapId: Int, reducerId: Int): Option[FileSegment] = {
+      val file = files(reducerId)
+      val blockOffsets = blockOffsetsByReducer(reducerId)
+      val index = mapIdToIndex.getOrElse(mapId, -1)
+      if (index >= 0) {
+        val offset = blockOffsets(index)
+        val length =
+          if (index + 1 < numBlocks) {
+            blockOffsets(index + 1) - offset
+          } else {
+            file.length() - offset
+          }
+        assert(length >= 0)
+        Some(new FileSegment(file, offset, length))
+      } else {
+        None
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6201e5e2/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index 021f6f6..1e4db4f 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -38,19 +38,19 @@ object StoragePerfTester {
     val blockManager = sc.env.blockManager
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {
-      val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
+      val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
         new KryoSerializer())
-      val buckets = shuffle.acquireWriters(mapId)
+      val writers = shuffle.writers
       for (i <- 1 to recordsPerMap) {
-        buckets.writers(i % numOutputSplits).write(writeData)
+        writers(i % numOutputSplits).write(writeData)
       }
-      buckets.writers.map {w =>
+      writers.map {w =>
         w.commit()
         total.addAndGet(w.fileSegment().length)
         w.close()
       }
 
-      shuffle.releaseWriters(buckets, true)
+      shuffle.releaseWriters(true)
     }
 
     val start = System.currentTimeMillis()


[06/12] git commit: Add documentation and address other comments

Posted by rx...@apache.org.
Add documentation and address other comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1592adfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1592adfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1592adfa

Branch: refs/heads/master
Commit: 1592adfa259860494353babfb48c80b7d1087379
Parents: 7d44dec
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sat Nov 2 00:19:04 2013 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:44 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/storage/DiskBlockManager.scala | 12 ++---
 .../spark/storage/ShuffleBlockManager.scala     | 49 ++++++++++++--------
 2 files changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1592adfa/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index bde3d1f..fcd2e97 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -55,14 +55,12 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
    * Otherwise, we assume the Block is mapped to a whole file identified by the BlockId directly.
    */
   def getBlockLocation(blockId: BlockId): FileSegment = {
-    if (blockId.isShuffle) {
-      val segment = shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
-      if (segment.isDefined) { return segment.get }
-      // If no special mapping found, assume standard block -> file mapping...
+    if (blockId.isShuffle && shuffleManager.consolidateShuffleFiles) {
+      shuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId])
+    } else {
+      val file = getFile(blockId.name)
+      new FileSegment(file, 0, file.length())
     }
-
-    val file = getFile(blockId.name)
-    new FileSegment(file, 0, file.length())
   }
 
   def getFile(filename: String): File = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1592adfa/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index d718c87..d1e3074 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -45,18 +45,24 @@ trait ShuffleBlocks {
 }
 
 /**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
- * per reducer.
+ * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file and
+ * per reducer (this set of files is called a ShuffleFileGroup).
  *
  * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
- * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
- * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
- * it releases them for another task.
+ * blocks are aggregated into the same file. There is one "combined shuffle file" (ShuffleFile) per
+ * reducer per concurrently executing shuffle task. As soon as a task finishes writing to its
+ * shuffle files, it releases them for another task.
  * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
  *   - shuffleId: The unique id given to the entire shuffle stage.
  *   - bucketId: The id of the output partition (i.e., reducer id)
  *   - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
  *       time owns a particular fileId, and this id is returned to a pool when the task finishes.
+ *
+ * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
+ * ShuffleBlockIds to FileSegments, each ShuffleFile maintains a list of offsets for each block
+ * stored in that file. In order to find the location of a shuffle block, we search all ShuffleFiles
+ * destined for the block's reducer.
+ *
  */
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
@@ -124,9 +130,9 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
     }
   }
 
-  def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
+  private def initializeShuffleMetadata(shuffleId: Int, numBuckets: Int) {
     val prev = shuffleToFileGroupPoolMap.putIfAbsent(shuffleId, new ShuffleFileGroupPool())
-    if (prev == None) {
+    if (!prev.isDefined) {
       val reducerToFilesMap = new Array[ConcurrentLinkedQueue[ShuffleFile]](numBuckets)
       for (reducerId <- 0 until numBuckets) {
         reducerToFilesMap(reducerId) = new ConcurrentLinkedQueue[ShuffleFile]()
@@ -142,6 +148,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
     var fileGroup = pool.getUnusedFileGroup()
 
     // If we reuse a file group, ensure we maintain mapId monotonicity.
+    // This means we may create extra ShuffleFileGroups if we're trying to run a map task
+    // that is out-of-order with respect to its mapId (which may happen when failures occur).
     val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
     while (fileGroup != null && fileGroup.maxMapId >= mapId) {
       fileGroupsToReturn += fileGroup
@@ -170,21 +178,19 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 
   /**
    * Returns the physical file segment in which the given BlockId is located.
-   * If we have no special mapping, None will be returned.
+   * This function should only be called if shuffle file consolidation is enabled, as it is
+   * an error condition if we don't find the expected block.
    */
-  def getBlockLocation(id: ShuffleBlockId): Option[FileSegment] = {
+  def getBlockLocation(id: ShuffleBlockId): FileSegment = {
     // Search all files associated with the given reducer.
     // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
-    if (consolidateShuffleFiles) {
-      val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
-      for (file <- filesForReducer) {
-        val segment = file.getFileSegmentFor(id.mapId)
-        if (segment != None) { return segment }
-      }
-
-      logInfo("Failed to find shuffle block: " + id)
+    val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
+    for (file <- filesForReducer) {
+      val segment = file.getFileSegmentFor(id.mapId)
+      if (segment != None) { return segment.get }
     }
-    None
+
+    throw new IllegalStateException("Failed to find shuffle block: " + id)
   }
 
   private def physicalFileName(shuffleId: Int, bucketId: Int, fileId: Int) = {
@@ -204,6 +210,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
+  /**
+   * Contains the set of mappers that have written to this file group, in the same order as they
+   * have written to their respective files.
+   */
   private val mapIds = new PrimitiveVector[Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
@@ -238,8 +248,9 @@ class ShuffleFile(val file: File) {
   /**
    * Consecutive offsets of blocks into the file, ordered by position in the file.
    * This ordering allows us to compute block lengths by examining the following block offset.
+   * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i).
    */
-  val blockOffsets = new PrimitiveVector[Long]()
+  private val blockOffsets = new PrimitiveVector[Long]()
 
   /** Back pointer to whichever ShuffleFileGroup this file is a part of. */
   private var shuffleFileGroup : ShuffleFileGroup = _