You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/24 23:36:58 UTC

[1/2] git commit: Clean up shuffle files once their metadata is gone

Updated Branches:
  refs/heads/master 3bf7c708d -> c2dd6bcd6


Clean up shuffle files once their metadata is gone

Previously, we would only clean the in-memory metadata for consolidated
shuffle files.

Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0647ec97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0647ec97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0647ec97

Branch: refs/heads/master
Commit: 0647ec97573dc267c7a6b4679fb938b4dfa4fbb6
Parents: 440e531
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Dec 19 15:10:48 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Thu Dec 19 15:40:48 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/ShuffleBlockManager.scala     | 25 +++++++++++++++++---
 .../org/apache/spark/util/MetadataCleaner.scala |  2 +-
 .../apache/spark/util/TimeStampedHashMap.scala  | 15 +++++++++---
 3 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0647ec97/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e828e1d..212ef65 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -70,10 +70,16 @@ class ShuffleBlockManager(blockManager: BlockManager) {
    * Contains all the state related to a particular shuffle. This includes a pool of unused
    * ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
    */
-  private class ShuffleState() {
+  private class ShuffleState(val numBuckets: Int) {
     val nextFileId = new AtomicInteger(0)
     val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
     val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
+
+    /**
+     * The mapIds of all map tasks completed on this Executor for this shuffle.
+     * NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
+     */
+    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
   }
 
   type ShuffleId = Int
@@ -84,7 +90,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
 
   def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
     new ShuffleWriterGroup {
-      shuffleStates.putIfAbsent(shuffleId, new ShuffleState())
+      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
       private val shuffleState = shuffleStates(shuffleId)
       private var fileGroup: ShuffleFileGroup = null
 
@@ -109,6 +115,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
             fileGroup.recordMapOutput(mapId, offsets)
           }
           recycleFileGroup(fileGroup)
+        } else {
+          shuffleState.completedMapTasks.add(mapId)
         }
       }
 
@@ -154,7 +162,18 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   }
 
   private def cleanup(cleanupTime: Long) {
-    shuffleStates.clearOldValues(cleanupTime)
+    shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => {
+      if (consolidateShuffleFiles) {
+        for (fileGroup <- state.allFileGroups; file <- fileGroup.files) {
+          file.delete()
+        }
+      } else {
+        for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) {
+          val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
+          blockManager.diskBlockManager.getFile(blockId).delete()
+        }
+      }
+    })
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0647ec97/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 7b41ef8..fe56960 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -27,7 +27,7 @@ import org.apache.spark.Logging
 class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
   val name = cleanerType.toString
 
-  private val delaySeconds = MetadataCleaner.getDelaySeconds
+  private val delaySeconds = MetadataCleaner.getDelaySeconds(cleanerType)
   private val periodSeconds = math.max(10, delaySeconds / 10)
   private val timer = new Timer(name + " cleanup timer", true)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0647ec97/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index dbff571..181ae2f 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -104,19 +104,28 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging {
   def toMap: immutable.Map[A, B] = iterator.toMap
 
   /**
-   * Removes old key-value pairs that have timestamp earlier than `threshTime`
+   * Removes old key-value pairs that have timestamp earlier than `threshTime`,
+   * calling the supplied function on each such entry before removing.
    */
-  def clearOldValues(threshTime: Long) {
+  def clearOldValues(threshTime: Long, f: (A, B) => Unit) {
     val iterator = internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
+    while (iterator.hasNext) {
       val entry = iterator.next()
       if (entry.getValue._2 < threshTime) {
+        f(entry.getKey, entry.getValue._1)
         logDebug("Removing key " + entry.getKey)
         iterator.remove()
       }
     }
   }
 
+  /**
+   * Removes old key-value pairs that have timestamp earlier than `threshTime`
+   */
+  def clearOldValues(threshTime: Long) {
+    clearOldValues(threshTime, (_, _) => ())
+  }
+
   private def currentTime: Long = System.currentTimeMillis()
 
 }


[2/2] git commit: Merge pull request #279 from aarondav/shuffle-cleanup0

Posted by pw...@apache.org.
Merge pull request #279 from aarondav/shuffle-cleanup0

Clean up shuffle files once their metadata is gone

Previously, we would only clean the in-memory metadata for consolidated shuffle files.

Additionally, fixes a bug where the Metadata Cleaner was ignoring type-specific TTLs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c2dd6bcd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c2dd6bcd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c2dd6bcd

Branch: refs/heads/master
Commit: c2dd6bcd6eda2c5a741138e9a984c40d2635ca33
Parents: 3bf7c70 0647ec9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Dec 24 14:36:47 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Dec 24 14:36:47 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/ShuffleBlockManager.scala     | 25 +++++++++++++++++---
 .../org/apache/spark/util/MetadataCleaner.scala |  2 +-
 .../apache/spark/util/TimeStampedHashMap.scala  | 15 +++++++++---
 3 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------