You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/21 09:09:55 UTC

[06/10] git commit: Remove shuffle files if they are still present on a machine.

Remove shuffle files if they are still present on a machine.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/de526ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/de526ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/de526ad5

Branch: refs/heads/master
Commit: de526ad52766e7f2e597d572e281e2ba2ee52b46
Parents: f84400e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:11:22 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:11:22 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/ShuffleBlockManager.scala | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de526ad5/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b2429..173c329 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
 
+import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
  * files within a ShuffleFileGroups associated with the block's reducer.
  */
 private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   def conf = blockManager.conf
 
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
           val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          // Because of previous failures, the shuffle file may already exist on this machine.
+          // If so, remove it.
+          if (blockFile.exists()) {
+            val removed = blockFile.delete()
+            logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+          }
           blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
         }
       }