You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:56 UTC

[39/50] git commit: Refactor using SparkConf

Refactor using SparkConf


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/838b0e7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/838b0e7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/838b0e7d

Branch: refs/heads/master
Commit: 838b0e7d154699291f9915d400c59a3580173d01
Parents: df413e9
Author: Andrew Or <an...@gmail.com>
Authored: Fri Jan 3 16:13:40 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Fri Jan 3 16:13:40 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/Aggregator.scala |  5 +++--
 .../scala/org/apache/spark/rdd/CoGroupedRDD.scala    |  5 +++--
 .../util/collection/ExternalAppendOnlyMap.scala      | 15 +++++++++------
 .../util/collection/ExternalAppendOnlyMapSuite.scala | 15 ++++++---------
 4 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/838b0e7d/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index c408d5f..c9e3e8e 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,8 +33,10 @@ case class Aggregator[K, V, C: ClassTag] (
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C) {
 
+  private val sparkConf = new SparkConf()
+  private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
+
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
-    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kv: Product2[K, V] = null
@@ -58,7 +60,6 @@ case class Aggregator[K, V, C: ClassTag] (
   }
 
   def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
-    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
     if (!externalSorting) {
       val combiners = new AppendOnlyMap[K,C]
       var kc: Product2[K, C] = null

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/838b0e7d/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 44494c7..7dc7094 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf}
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
 
 private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -66,6 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
   private type CoGroupValue = (Any, Int)  // Int is dependency number
   private type CoGroupCombiner = Seq[CoGroup]
 
+  private val sparkConf = new SparkConf()
   private var serializerClass: String = null
 
   def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -106,7 +107,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
 
-    val externalSorting = System.getProperty("spark.shuffle.externalSorting", "false").toBoolean
+    val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = split.deps.size
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/838b0e7d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9e147fe..68a2319 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
 
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{SparkConf, Logging, SparkEnv}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
 
@@ -57,14 +57,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
   private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskIterator]
+
+  private val sparkConf = new SparkConf()
   private val memoryThresholdMB = {
     // TODO: Turn this into a fraction of memory per reducer
-    val bufferSize = System.getProperty("spark.shuffle.buffer.mb", "1024").toLong
-    val bufferPercent = System.getProperty("spark.shuffle.buffer.fraction", "0.8").toFloat
+    val bufferSize = sparkConf.getLong("spark.shuffle.buffer.mb", 1024)
+    val bufferPercent = sparkConf.getDouble("spark.shuffle.buffer.fraction", 0.8)
     bufferSize * bufferPercent
   }
-  private val fileBufferSize =
-    System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+  private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
+  private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean
   private val comparator = new KCComparator[K, C]
   private val ser = serializer.newInstance()
   private var spillCount = 0
@@ -84,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     logWarning(s"In-memory KV map exceeded threshold of $memoryThresholdMB MB!")
     logWarning(s"Spilling to disk ($spillCount time"+(if (spillCount > 1) "s" else "")+" so far)")
     val (blockId, file) = diskBlockManager.createTempBlock()
-    val writer = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity)
+    val writer =
+      new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, identity, syncWrites)
     try {
       val it = currentMap.destructiveSortedIterator(comparator)
       while (it.hasNext) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/838b0e7d/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index a18d466..6c93b1f 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -4,20 +4,17 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.{HashPartitioner, SparkContext, SparkEnv, LocalSparkContext}
+import org.apache.spark._
 import org.apache.spark.SparkContext.rddToPairRDDFunctions
 
 class ExternalAppendOnlyMapSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
 
   override def beforeEach() {
-    sc = new SparkContext("local", "test")
-    System.setProperty("spark.shuffle.externalSorting", "true")
-  }
-
-  after {
-    System.setProperty("spark.shuffle.externalSorting", "false")
-    System.setProperty("spark.shuffle.buffer.mb", "1024")
-    System.setProperty("spark.shuffle.buffer.fraction", "0.8")
+    val conf = new SparkConf(false)
+    conf.set("spark.shuffle.externalSorting", "true")
+    conf.set("spark.shuffle.buffer.mb", "1024")
+    conf.set("spark.shuffle.buffer.fraction", "0.8")
+    sc = new SparkContext("local", "test", conf)
   }
 
   val createCombiner: (Int => ArrayBuffer[Int]) = i => ArrayBuffer[Int](i)