You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/21 09:09:50 UTC
[01/10] git commit: Removing docs on akka options
Updated Branches:
refs/heads/master c67d3d8be -> 77b986f66
Removing docs on akka options
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cdb003e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cdb003e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cdb003e3
Branch: refs/heads/master
Commit: cdb003e3766b6e530d1ac51b16d155e59c329ab5
Parents: 792d908
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 16:35:26 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 16:40:58 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/SparkConf.scala | 10 +++++++++-
docs/configuration.md | 7 -------
2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cdb003e3/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 951bfd7..45d19bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+ def getAkkaConf: Seq[(String, String)] =
+ /* This is currently undocumented. If we want to make this public we should consider
+ * nesting options under the spark namespace to avoid conflicts with user akka options.
+ * Otherwise users configuring their own akka code via system properties could mess up
+ * spark's akka options.
+ *
+ * E.g. spark.akka.option.x.y.x = "value"
+ */
+ getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cdb003e3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0086490..4c2e9cc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -379,13 +379,6 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
-<tr>
- <td>akka.x.y....</td>
- <td>value</td>
- <td>
- An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
- </td>
-</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
[03/10] git commit: Bug fix for reporting of spill output
Posted by pw...@apache.org.
Bug fix for reporting of spill output
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1b299142
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1b299142
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1b299142
Branch: refs/heads/master
Commit: 1b299142a8d5feb70677dce993127de466266ff6
Parents: 54867e9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 18:34:00 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 18:34:00 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1b299142/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b43..8df8b4f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -168,6 +168,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
writer.commit()
+ writer.close()
+ _diskBytesSpilled += writer.bytesWritten
writer = getNewWriter
objectsWritten = 0
}
@@ -176,8 +178,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten > 0) writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
- _diskBytesSpilled += writer.bytesWritten
writer.close()
+ _diskBytesSpilled += writer.bytesWritten
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId))
[05/10] git commit: Fixing speculation bug
Posted by pw...@apache.org.
Fixing speculation bug
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f84400e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f84400e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f84400e8
Branch: refs/heads/master
Commit: f84400e86c459b1ebbe452bc21e821a11cd72c29
Parents: c324ac1
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:05:03 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:05:03 2014 -0800
----------------------------------------------------------------------
.../src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f84400e8/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5ad00a1..b9ce94b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
/** Check whether a task is currently running an attempt on a given host */
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
- !taskAttempts(taskIndex).exists(_.host == host)
+ taskAttempts(taskIndex).exists(_.host == host)
}
/**
[10/10] git commit: Merge pull request #480 from pwendell/0.9-fixes
Posted by pw...@apache.org.
Merge pull request #480 from pwendell/0.9-fixes
Handful of 0.9 fixes
This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate.
@mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in #477 and follow up discussion with him.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/77b986f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/77b986f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/77b986f6
Branch: refs/heads/master
Commit: 77b986f6616e6f7e0be9e46bb355829686f9845b
Parents: c67d3d8 a9bcc98
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 21 00:09:42 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 21 00:09:42 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/SparkConf.scala | 10 ++++-
.../org/apache/spark/rdd/CheckpointRDD.scala | 4 +-
.../apache/spark/scheduler/TaskSetManager.scala | 2 +-
.../spark/storage/BlockObjectWriter.scala | 5 ++-
.../spark/storage/ShuffleBlockManager.scala | 16 ++++++--
.../util/collection/ExternalAppendOnlyMap.scala | 41 ++++++++++++++++----
docs/configuration.md | 11 ++----
7 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[09/10] git commit: Style clean-up
Posted by pw...@apache.org.
Style clean-up
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a9bcc980
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a9bcc980
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a9bcc980
Branch: refs/heads/master
Commit: a9bcc980b693bf5b0959caccf74367fc70348041
Parents: a917a87
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 23:42:24 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 21 00:05:28 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/storage/ShuffleBlockManager.scala | 9 ++++++---
.../spark/util/collection/ExternalAppendOnlyMap.scala | 11 +++--------
2 files changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9bcc980/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 173c329..bb07c8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -109,9 +109,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
- if (blockFile.exists()) {
- val removed = blockFile.delete()
- logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+ if (blockFile.exists) {
+ if (blockFile.delete()) {
+ logInfo(s"Removed existing shuffle file $blockFile")
+ } else {
+ logWarning(s"Failed to remove existing shuffle file $blockFile")
+ }
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9bcc980/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 792f29d..fb73636 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -173,15 +173,10 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* most likely require using the file channel API.
*/
- val codec = new LZFCompressionCodec(sparkConf)
-
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
def wrapForCompression(outputStream: OutputStream) = {
- blockManager.shouldCompress(blockId) match {
- case true =>
- codec.compressedOutputStream(outputStream)
- case false =>
- outputStream
- }
+ if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
}
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
[08/10] git commit: Adding small code comment
Posted by pw...@apache.org.
Adding small code comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a917a87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a917a87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a917a87e
Branch: refs/heads/master
Commit: a917a87e02c5f094f81fa7f9e1702135f440da82
Parents: d46df96
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 23:11:45 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 23:11:45 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/storage/BlockObjectWriter.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a917a87e/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 924ec7c..530712b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -146,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
override def commit(): Long = {
if (initialized) {
- // NOTE: Flush the serializer first and then the compressed/buffered output stream
+ // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
+ // serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
[02/10] git commit: Minor fixes
Posted by pw...@apache.org.
Minor fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/54867e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/54867e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/54867e95
Branch: refs/heads/master
Commit: 54867e9566c81d2f60e7a5ffb52a8d2204eb47f8
Parents: cdb003e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 18:33:21 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 18:33:21 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/storage/BlockObjectWriter.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54867e95/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 48cec4b..924ec7c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
fos = null
ts = null
objOut = null
+ initialized = false
}
}
@@ -175,7 +176,6 @@ private[spark] class DiskBlockObjectWriter(
}
override def fileSegment(): FileSegment = {
- val bytesWritten = lastValidPosition - initialPosition
new FileSegment(file, initialPosition, bytesWritten)
}
[07/10] git commit: Avoid matching attempt files in the checkpoint
Posted by pw...@apache.org.
Avoid matching attempt files in the checkpoint
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d46df96d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d46df96d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d46df96d
Branch: refs/heads/master
Commit: d46df96de3e3b4fb69541aa5cfa2a811c8b860eb
Parents: de526ad
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 20:02:02 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 20:03:23 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d46df96d/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 83109d1..30e578d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
val numPartitions =
// listStatus can throw exception if path does not exist.
if (fs.exists(cpath)) {
- val dirContents = fs.listStatus(cpath)
- val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+ val dirContents = fs.listStatus(cpath).map(_.getPath)
+ val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
[04/10] git commit: Force use of LZF when spilling data
Posted by pw...@apache.org.
Force use of LZF when spilling data
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c324ac10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c324ac10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c324ac10
Branch: refs/heads/master
Commit: c324ac10ee208c53dabd54e5c0e1885aea456811
Parents: 1b29914
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:00:48 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:00:48 2014 -0800
----------------------------------------------------------------------
.../util/collection/ExternalAppendOnlyMap.scala | 42 +++++++++++++++++---
docs/configuration.md | 4 +-
2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8df8b4f..792f29d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
- val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+ /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+ * closes and re-opens serialization and compression streams within each file. This makes some
+ * assumptions about the way that serialization and compression streams work, specifically:
+ *
+ * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+ *
+ * 2) Several compression streams can be opened, written to, and flushed on the write path
+ * while only one compression input stream is created on the read path
+ *
+ * In practice (1) is only true for Java, so we add a special fix below to make it work for
+ * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+ *
+ * To avoid making these assumptions we should create an intermediate stream that batches
+ * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+ * This is a bit tricky because, within each segment, you'd need to track the total number
+ * of bytes written and then re-wind and write it at the beginning of the segment. This will
+ * most likely require using the file channel API.
+ */
+
+ val codec = new LZFCompressionCodec(sparkConf)
+
+ def wrapForCompression(outputStream: OutputStream) = {
+ blockManager.shouldCompress(blockId) match {
+ case true =>
+ codec.compressedOutputStream(outputStream)
+ case false =>
+ outputStream
+ }
+ }
+
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
- compressStream, syncWrites)
+ wrapForCompression, syncWrites)
var writer = getNewWriter
var objectsWritten = 0
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4c2e9cc..be548e3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td>
<td>true</td>
<td>
- Whether to compress data spilled during shuffles.
+ Whether to compress data spilled during shuffles. If enabled, spill compression
+ always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
+ regardless of the value of `spark.io.compression.codec`.
</td>
</tr>
<tr>
[06/10] git commit: Remove shuffle files if they are still present on
a machine.
Posted by pw...@apache.org.
Remove shuffle files if they are still present on a machine.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/de526ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/de526ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/de526ad5
Branch: refs/heads/master
Commit: de526ad52766e7f2e597d572e281e2ba2ee52b46
Parents: f84400e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:11:22 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:11:22 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/storage/ShuffleBlockManager.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de526ad5/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b2429..173c329 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
+import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ // Because of previous failures, the shuffle file may already exist on this machine.
+ // If so, remove it.
+ if (blockFile.exists()) {
+ val removed = blockFile.delete()
+ logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+ }
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}