You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/05 02:54:21 UTC

[07/12] git commit: use OpenHashMap, remove monotonicity requirement, fix failure bug

use OpenHashMap, remove monotonicity requirement, fix failure bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a0bb569a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a0bb569a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a0bb569a

Branch: refs/heads/master
Commit: a0bb569a818f6ce66c192a3f5782ff56cf58b1d3
Parents: 8703898
Author: Aaron Davidson <aa...@databricks.com>
Authored: Sun Nov 3 20:45:11 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Sun Nov 3 21:34:56 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/ShuffleMapTask.scala |  4 +-
 .../spark/storage/ShuffleBlockManager.scala     | 56 ++++++--------------
 .../spark/storage/StoragePerfTester.scala       |  2 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |  5 ++
 4 files changed, 26 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 24d97da..c502f8f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -148,6 +148,7 @@ private[spark] class ShuffleMapTask(
     val blockManager = SparkEnv.get.blockManager
     var shuffle: ShuffleBlocks = null
     var buckets: ShuffleWriterGroup = null
+    var success = false
 
     try {
       // Obtain all the block writers for shuffle blocks.
@@ -179,6 +180,7 @@ private[spark] class ShuffleMapTask(
       shuffleMetrics.shuffleWriteTime = totalTime
       metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
 
+      success = true
       new MapStatus(blockManager.blockManagerId, compressedSizes)
     } catch { case e: Exception =>
       // If there is an exception from running the task, revert the partial writes
@@ -191,7 +193,7 @@ private[spark] class ShuffleMapTask(
       // Release the writers back to the shuffle block manager.
       if (shuffle != null && buckets != null) {
         buckets.writers.foreach(_.close())
-        shuffle.releaseWriters(buckets)
+        shuffle.releaseWriters(buckets, success)
       }
       // Execute the callbacks on task completion.
       context.executeOnCompleteCallbacks()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 57b1a28..8b202ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable
 import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
 private[spark]
 class ShuffleWriterGroup(
@@ -41,7 +41,8 @@ trait ShuffleBlocks {
   /** Get a group of writers for this map task. */
   def acquireWriters(mapId: Int): ShuffleWriterGroup
 
-  def releaseWriters(group: ShuffleWriterGroup)
+  /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
+  def releaseWriters(group: ShuffleWriterGroup, success: Boolean)
 }
 
 /**
@@ -123,12 +124,14 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
         new ShuffleWriterGroup(mapId, fileGroup, writers)
       }
 
-      override def releaseWriters(group: ShuffleWriterGroup) {
+      override def releaseWriters(group: ShuffleWriterGroup, success: Boolean) {
         if (consolidateShuffleFiles) {
           val fileGroup = group.fileGroup
-          fileGroup.addMapper(group.mapId)
-          for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
-            shuffleFile.recordMapOutput(writer.fileSegment().offset)
+          if (success) {
+            fileGroup.addMapper(group.mapId)
+            for ((writer, shuffleFile) <- group.writers.zip(fileGroup.files)) {
+              shuffleFile.recordMapOutput(writer.fileSegment().offset)
+            }
           }
           recycleFileGroup(shuffleId, fileGroup)
         }
@@ -149,18 +152,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 
   private def getUnusedFileGroup(shuffleId: Int, mapId: Int, numBuckets: Int): ShuffleFileGroup = {
     val pool = shuffleToFileGroupPoolMap(shuffleId)
-    var fileGroup = pool.getUnusedFileGroup()
-
-    // If we reuse a file group, ensure we maintain mapId monotonicity.
-    // This means we may create extra ShuffleFileGroups if we're trying to run a map task
-    // that is out-of-order with respect to its mapId (which may happen when failures occur).
-    val fileGroupsToReturn = mutable.ListBuffer[ShuffleFileGroup]()
-    while (fileGroup != null && fileGroup.maxMapId >= mapId) {
-      fileGroupsToReturn += fileGroup
-      fileGroup = pool.getUnusedFileGroup()
-    }
-    pool.returnFileGroups(fileGroupsToReturn) // re-add incompatible file groups
-
+    val fileGroup = pool.getUnusedFileGroup()
     if (fileGroup == null) {
       val fileId = pool.getNextFileId()
       val files = Array.tabulate[ShuffleFile](numBuckets) { bucketId =>
@@ -187,7 +179,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
    */
   def getBlockLocation(id: ShuffleBlockId): FileSegment = {
     // Search all files associated with the given reducer.
-    // This process is O(m log n) for m threads and n mappers. Could be sweetened to "likely" O(m).
     val filesForReducer = shuffleToReducerToFilesMap(id.shuffleId)(id.reduceId)
     for (file <- filesForReducer) {
       val segment = file.getFileSegmentFor(id.mapId)
@@ -210,37 +201,24 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
 /**
  * A group of shuffle files, one per reducer.
  * A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
- * Mappers must be added in monotonically increasing order by id for efficiency purposes.
  */
 private[spark]
 class ShuffleFileGroup(val shuffleId: Int, val fileId: Int, val files: Array[ShuffleFile]) {
   /**
-   * Contains the set of mappers that have written to this file group, in the same order as they
-   * have written to their respective files.
+   * Stores the absolute index of each mapId in the files of this group. For instance,
+   * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
    */
-  private val mapIds = new PrimitiveVector[Int]()
+  private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
 
   files.foreach(_.setShuffleFileGroup(this))
 
-  /** The maximum map id (i.e., last added map task) in this file group. */
-  def maxMapId = if (mapIds.length > 0) mapIds(mapIds.length - 1) else -1
-
   def apply(bucketId: Int) = files(bucketId)
 
   def addMapper(mapId: Int) {
-    assert(mapId > maxMapId, "Attempted to insert mapId out-of-order")
-    mapIds += mapId
+    mapIdToIndex(mapId) = mapIdToIndex.size
   }
 
-  /**
-   * Uses binary search, giving O(log n) runtime.
-   * NB: Could be improved to amortized O(1) for usual access pattern, where nodes are accessed
-   * in order of monotonically increasing mapId. That approach is more fragile in general, however.
-   */
-  def indexOf(mapId: Int): Int = {
-    val index = util.Arrays.binarySearch(mapIds.getUnderlyingArray, 0, mapIds.length, mapId)
-    if (index >= 0) index else -1
-  }
+  def indexOf(mapId: Int): Int = mapIdToIndex.getOrElse(mapId, -1)
 }
 
 /**
@@ -252,7 +230,7 @@ class ShuffleFile(val file: File) {
   /**
    * Consecutive offsets of blocks into the file, ordered by position in the file.
    * This ordering allows us to compute block lengths by examining the following block offset.
-   * blockOffsets(i) contains the offset for the mapper in shuffleFileGroup.mapIds(i).
+   * Note: shuffleFileGroup.indexOf(mapId) returns the index of the mapper into this array.
    */
   private val blockOffsets = new PrimitiveVector[Long]()
 
@@ -284,7 +262,7 @@ class ShuffleFile(val file: File) {
           file.length() - offset
         }
       assert(length >= 0)
-      return Some(new FileSegment(file, offset, length))
+      Some(new FileSegment(file, offset, length))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index 7dcadc3..021f6f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -50,7 +50,7 @@ object StoragePerfTester {
         w.close()
       }
 
-      shuffle.releaseWriters(buckets)
+      shuffle.releaseWriters(buckets, true)
     }
 
     val start = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a0bb569a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 4adf9cf..a119880 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -53,6 +53,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
     _values(pos)
   }
 
+  def getOrElse(k: K, elseValue: V): V = {
+    val pos = _keySet.getPos(k)
+    if (pos >= 0) _values(pos) else elseValue
+  }
+
   /** Set the value for a key */
   def update(k: K, v: V) {
     val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK