You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/21 09:09:50 UTC

[01/10] git commit: Removing docs on akka options

Updated Branches:
  refs/heads/master c67d3d8be -> 77b986f66


Removing docs on akka options


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cdb003e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cdb003e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cdb003e3

Branch: refs/heads/master
Commit: cdb003e3766b6e530d1ac51b16d155e59c329ab5
Parents: 792d908
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 16:35:26 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 16:40:58 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkConf.scala | 10 +++++++++-
 docs/configuration.md                                |  7 -------
 2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cdb003e3/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 951bfd7..45d19bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
   }
 
   /** Get all akka conf variables set on this SparkConf */
-  def getAkkaConf: Seq[(String, String)] =  getAll.filter {case (k, v) => k.startsWith("akka.")}
+  def getAkkaConf: Seq[(String, String)] =
+    /* This is currently undocumented. If we want to make this public we should consider
+     * nesting options under the spark namespace to avoid conflicts with user akka options.
+     * Otherwise users configuring their own akka code via system properties could mess up
+     * spark's akka options.
+     *
+     *   E.g. spark.akka.option.x.y.x = "value"
+     */
+    getAll.filter {case (k, v) => k.startsWith("akka.")}
 
   /** Does the configuration contain a given parameter? */
   def contains(key: String): Boolean = settings.contains(key)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cdb003e3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0086490..4c2e9cc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -379,13 +379,6 @@ Apart from these, the following properties are also available, and may be useful
     Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
   </td>
 </tr>
-<tr>
-  <td>akka.x.y....</td>
-  <td>value</td>
-  <td>
-    An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
-  </td>
-</tr>
 
 <tr>
   <td>spark.shuffle.consolidateFiles</td>


[03/10] git commit: Bug fix for reporting of spill output

Posted by pw...@apache.org.
Bug fix for reporting of spill output


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1b299142
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1b299142
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1b299142

Branch: refs/heads/master
Commit: 1b299142a8d5feb70677dce993127de466266ff6
Parents: 54867e9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 18:34:00 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 18:34:00 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1b299142/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b43..8df8b4f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -168,6 +168,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           writer.commit()
+          writer.close()
+          _diskBytesSpilled += writer.bytesWritten
           writer = getNewWriter
           objectsWritten = 0
         }
@@ -176,8 +178,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       if (objectsWritten > 0) writer.commit()
     } finally {
       // Partial failures cannot be tolerated; do not revert partial writes
-      _diskBytesSpilled += writer.bytesWritten
       writer.close()
+      _diskBytesSpilled += writer.bytesWritten
     }
     currentMap = new SizeTrackingAppendOnlyMap[K, C]
     spilledMaps.append(new DiskMapIterator(file, blockId))


[05/10] git commit: Fixing speculation bug

Posted by pw...@apache.org.
Fixing speculation bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f84400e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f84400e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f84400e8

Branch: refs/heads/master
Commit: f84400e86c459b1ebbe452bc21e821a11cd72c29
Parents: c324ac1
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:05:03 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:05:03 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f84400e8/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5ad00a1..b9ce94b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
 
   /** Check whether a task is currently running an attempt on a given host */
   private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
-    !taskAttempts(taskIndex).exists(_.host == host)
+    taskAttempts(taskIndex).exists(_.host == host)
   }
 
   /**


[10/10] git commit: Merge pull request #480 from pwendell/0.9-fixes

Posted by pw...@apache.org.
Merge pull request #480 from pwendell/0.9-fixes

Handful of 0.9 fixes

This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate.

@mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in #477 and follow up discussion with him.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/77b986f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/77b986f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/77b986f6

Branch: refs/heads/master
Commit: 77b986f6616e6f7e0be9e46bb355829686f9845b
Parents: c67d3d8 a9bcc98
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Jan 21 00:09:42 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 21 00:09:42 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 10 ++++-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  4 +-
 .../apache/spark/scheduler/TaskSetManager.scala |  2 +-
 .../spark/storage/BlockObjectWriter.scala       |  5 ++-
 .../spark/storage/ShuffleBlockManager.scala     | 16 ++++++--
 .../util/collection/ExternalAppendOnlyMap.scala | 41 ++++++++++++++++----
 docs/configuration.md                           | 11 ++----
 7 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------



[09/10] git commit: Style clean-up

Posted by pw...@apache.org.
Style clean-up


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a9bcc980
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a9bcc980
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a9bcc980

Branch: refs/heads/master
Commit: a9bcc980b693bf5b0959caccf74367fc70348041
Parents: a917a87
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 23:42:24 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jan 21 00:05:28 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/ShuffleBlockManager.scala   |  9 ++++++---
 .../spark/util/collection/ExternalAppendOnlyMap.scala    | 11 +++--------
 2 files changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9bcc980/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 173c329..bb07c8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -109,9 +109,12 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
           val blockFile = blockManager.diskBlockManager.getFile(blockId)
           // Because of previous failures, the shuffle file may already exist on this machine.
           // If so, remove it.
-          if (blockFile.exists()) {
-            val removed = blockFile.delete()
-            logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+          if (blockFile.exists) {
+            if (blockFile.delete()) {
+              logInfo(s"Removed existing shuffle file $blockFile")
+            } else {
+              logWarning(s"Failed to remove existing shuffle file $blockFile")
+            }
           }
           blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
         }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a9bcc980/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 792f29d..fb73636 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -173,15 +173,10 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
      * most likely require using the file channel API.
      */
 
-    val codec = new LZFCompressionCodec(sparkConf)
-
+    val shouldCompress = blockManager.shouldCompress(blockId)
+    val compressionCodec = new LZFCompressionCodec(sparkConf)
     def wrapForCompression(outputStream: OutputStream) = {
-      blockManager.shouldCompress(blockId) match {
-        case true =>
-          codec.compressedOutputStream(outputStream)
-        case false =>
-          outputStream
-      }
+      if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
     }
 
     def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,


[08/10] git commit: Adding small code comment

Posted by pw...@apache.org.
Adding small code comment


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a917a87e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a917a87e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a917a87e

Branch: refs/heads/master
Commit: a917a87e02c5f094f81fa7f9e1702135f440da82
Parents: d46df96
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 23:11:45 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 23:11:45 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/storage/BlockObjectWriter.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a917a87e/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 924ec7c..530712b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -146,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
 
   override def commit(): Long = {
     if (initialized) {
-      // NOTE: Flush the serializer first and then the compressed/buffered output stream
+      // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
+      //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
       val prevPos = lastValidPosition


[02/10] git commit: Minor fixes

Posted by pw...@apache.org.
Minor fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/54867e95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/54867e95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/54867e95

Branch: refs/heads/master
Commit: 54867e9566c81d2f60e7a5ffb52a8d2204eb47f8
Parents: cdb003e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 18:33:21 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 18:33:21 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/storage/BlockObjectWriter.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/54867e95/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 48cec4b..924ec7c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
       fos = null
       ts = null
       objOut = null
+      initialized = false
     }
   }
 
@@ -175,7 +176,6 @@ private[spark] class DiskBlockObjectWriter(
   }
 
   override def fileSegment(): FileSegment = {
-    val bytesWritten = lastValidPosition - initialPosition
     new FileSegment(file, initialPosition, bytesWritten)
   }
 


[07/10] git commit: Avoid matching attempt files in the checkpoint

Posted by pw...@apache.org.
Avoid matching attempt files in the checkpoint


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d46df96d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d46df96d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d46df96d

Branch: refs/heads/master
Commit: d46df96de3e3b4fb69541aa5cfa2a811c8b860eb
Parents: de526ad
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 20:02:02 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 20:03:23 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d46df96d/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 83109d1..30e578d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
     val numPartitions =
     // listStatus can throw exception if path does not exist.
     if (fs.exists(cpath)) {
-      val dirContents = fs.listStatus(cpath)
-      val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+      val dirContents = fs.listStatus(cpath).map(_.getPath)
+      val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
       val numPart =  partitionFiles.size
       if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
           ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {


[04/10] git commit: Force use of LZF when spilling data

Posted by pw...@apache.org.
Force use of LZF when spilling data


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c324ac10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c324ac10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c324ac10

Branch: refs/heads/master
Commit: c324ac10ee208c53dabd54e5c0e1885aea456811
Parents: 1b29914
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:00:48 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:00:48 2014 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 42 +++++++++++++++++---
 docs/configuration.md                           |  4 +-
 2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8df8b4f..792f29d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
 import java.io._
 import java.util.Comparator
 
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
 
 /**
  * An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
     val (blockId, file) = diskBlockManager.createTempBlock()
 
-    val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+    /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+    *  closes and re-opens serialization and compression streams within each file. This makes some
+     * assumptions about the way that serialization and compression streams work, specifically:
+     *
+     * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+     *
+     * 2) Several compression streams can be opened, written to, and flushed on the write path
+     *    while only one compression input stream is created on the read path
+     *
+     * In practice (1) is only true for Java, so we add a special fix below to make it work for
+     * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+     *
+     * To avoid making these assumptions we should create an intermediate stream that batches
+     * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+     * This is a bit tricky because, within each segment, you'd need to track the total number
+     * of bytes written and then re-wind and write it at the beginning of the segment. This will
+     * most likely require using the file channel API.
+     */
+
+    val codec = new LZFCompressionCodec(sparkConf)
+
+    def wrapForCompression(outputStream: OutputStream) = {
+      blockManager.shouldCompress(blockId) match {
+        case true =>
+          codec.compressedOutputStream(outputStream)
+        case false =>
+          outputStream
+      }
+    }
+
     def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
-      compressStream, syncWrites)
+      wrapForCompression, syncWrites)
 
     var writer = getNewWriter
     var objectsWritten = 0

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4c2e9cc..be548e3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
   <td>spark.shuffle.spill.compress</td>
   <td>true</td>
   <td>
-    Whether to compress data spilled during shuffles.
+    Whether to compress data spilled during shuffles. If enabled, spill compression
+    always uses the `org.apache.spark.io.LZFCompressionCodec` codec, 
+    regardless of the value of `spark.io.compression.codec`.
   </td>
 </tr>
 <tr>


[06/10] git commit: Remove shuffle files if they are still present on a machine.

Posted by pw...@apache.org.
Remove shuffle files if they are still present on a machine.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/de526ad5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/de526ad5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/de526ad5

Branch: refs/heads/master
Commit: de526ad52766e7f2e597d572e281e2ba2ee52b46
Parents: f84400e
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:11:22 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:11:22 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/ShuffleBlockManager.scala | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de526ad5/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b2429..173c329 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
 
+import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
  * files within a ShuffleFileGroups associated with the block's reducer.
  */
 private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   def conf = blockManager.conf
 
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,12 @@ class ShuffleBlockManager(blockManager: BlockManager) {
         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
           val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          // Because of previous failures, the shuffle file may already exist on this machine.
+          // If so, remove it.
+          if (blockFile.exists()) {
+            val removed = blockFile.delete()
+            logInfo(s"Removed existing shuffle file $blockFile successfully: $removed")
+          }
           blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
         }
       }