You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/05 21:33:51 UTC

git commit: Merge pull request #228 from pwendell/master

Updated Branches:
  refs/heads/branch-0.8 27212addd -> 47fce43cf


Merge pull request #228 from pwendell/master

Document missing configs and set shuffle consolidation to false.
(cherry picked from commit 5d460253d6080d871cb71efb112ea17be0873771)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/47fce43c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/47fce43c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/47fce43c

Branch: refs/heads/branch-0.8
Commit: 47fce43cfddb22b7173d9d12f815a9a05ffd1ca0
Parents: 27212ad
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Dec 5 12:31:24 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Dec 5 12:33:02 2013 -0800

----------------------------------------------------------------------
 .../spark/storage/ShuffleBlockManager.scala     |  2 +-
 .../spark/storage/DiskBlockManagerSuite.scala   | 14 ++++++--
 docs/configuration.md                           | 37 +++++++++++++++++++-
 3 files changed, 49 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 2f1b049..e828e1d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -62,7 +62,7 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    System.getProperty("spark.shuffle.consolidateFiles", "true").toBoolean
+    System.getProperty("spark.shuffle.consolidateFiles", "false").toBoolean
 
   private val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 0b90563..ef4c4c0 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -5,9 +5,9 @@ import java.io.{FileWriter, File}
 import scala.collection.mutable
 
 import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
-class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
 
   val rootDir0 = Files.createTempDir()
   rootDir0.deleteOnExit()
@@ -16,6 +16,12 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
   val rootDirs = rootDir0.getName + "," + rootDir1.getName
   println("Created root dirs: " + rootDirs)
 
+  // This suite focuses primarily on consolidation features,
+  // so we coerce consolidation if not already enabled.
+  val consolidateProp = "spark.shuffle.consolidateFiles"
+  val oldConsolidate = Option(System.getProperty(consolidateProp))
+  System.setProperty(consolidateProp, "true")
+
   val shuffleBlockManager = new ShuffleBlockManager(null) {
     var idToSegmentMap = mutable.Map[ShuffleBlockId, FileSegment]()
     override def getBlockLocation(id: ShuffleBlockId) = idToSegmentMap(id)
@@ -23,6 +29,10 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
 
   var diskBlockManager: DiskBlockManager = _
 
+  override def afterAll() {
+    oldConsolidate.map(c => System.setProperty(consolidateProp, c))
+  }
+
   override def beforeEach() {
     diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
     shuffleBlockManager.idToSegmentMap.clear()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/47fce43c/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 97183ba..22abe1c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -327,7 +327,42 @@ Apart from these, the following properties are also available, and may be useful
     Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
   </td>
 </tr>
-
+<tr>
+  <td>spark.shuffle.consolidateFiles</td>
+  <td>false</td>
+  <td>
+    If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance if you run shuffles with large numbers of reduce tasks.
+  </td>
+</tr>
+<tr>
+<tr>
+  <td>spark.speculation</td>
+  <td>false</td>
+  <td>
+    If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
+  </td>
+</tr>
+<tr>
+  <td>spark.speculation.interval</td>
+  <td>100</td>
+  <td>
+    How often Spark will check for tasks to speculate, in milliseconds.
+  </td>
+</tr>
+<tr>
+  <td>spark.speculation.quantile</td>
+  <td>0.75</td>
+  <td>
+    Percentage of tasks which must be complete before speculation is enabled for a particular stage.
+  </td>
+</tr>
+<tr>
+  <td>spark.speculation.multiplier</td>
+  <td>1.5</td>
+  <td>
+    How many times slower a task is than the median to be considered for speculation.
+  </td>
+</tr>
 </table>
 
 # Environment Variables