You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:26:01 UTC

[44/50] git commit: Get SparkConf from SparkEnv, rather than creating new ones

Get SparkConf from SparkEnv, rather than creating new ones


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/80ba9f8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/80ba9f8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/80ba9f8b

Branch: refs/heads/master
Commit: 80ba9f8ba06e623600469ddb3e59dffcbedee1d0
Parents: 4de9c95
Author: Andrew Or <an...@gmail.com>
Authored: Tue Jan 7 12:44:22 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Tue Jan 7 12:44:22 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Aggregator.scala          | 2 +-
 core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala    | 6 +++---
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala   | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 292e32e..08a96b0 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -31,7 +31,7 @@ case class Aggregator[K, V, C] (
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C) {
 
-  private val sparkConf = new SparkConf()
+  private val sparkConf = SparkEnv.get.conf
   private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
 
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7dc7094..b7c7773 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
 
 private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -66,7 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
   private type CoGroupValue = (Any, Int)  // Int is dependency number
   private type CoGroupCombiner = Seq[CoGroup]
 
-  private val sparkConf = new SparkConf()
+  private val sparkConf = SparkEnv.get.conf
   private var serializerClass: String = null
 
   def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -122,7 +122,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
-        val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
+        val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
         val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
         rddIterators += ((it, depNum))
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c348168..a5897e8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
 
-import org.apache.spark.{SparkConf, Logging, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
 
@@ -68,7 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
   private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskMapIterator]
-  private val sparkConf = new SparkConf()
+  private val sparkConf = SparkEnv.get.conf
 
   // Collective memory threshold shared across all running tasks
   private val maxMemoryThreshold = {