You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/21 09:09:55 UTC
[06/10] git commit: Remove shuffle files if they are still present on
a machine.
Remove shuffle files if they are still present on a machine.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/de526ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/de526ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/de526ad5
Branch: refs/heads/master
Commit: de526ad52766e7f2e597d572e281e2ba2ee52b46
Parents: f84400e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:11:22 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:11:22 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/storage/ShuffleBlockManager.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de526ad5/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b2429..173c329 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
+import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ // Because of previous failures, the shuffle file may already exist on this machine.
+ // If so, remove it.
+ if (blockFile.exists()) {
+ val removed = blockFile.delete()
+ logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+ }
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}