You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:26:01 UTC
[44/50] git commit: Get SparkConf from SparkEnv,
rather than creating new ones
Get SparkConf from SparkEnv, rather than creating new ones
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/80ba9f8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/80ba9f8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/80ba9f8b
Branch: refs/heads/master
Commit: 80ba9f8ba06e623600469ddb3e59dffcbedee1d0
Parents: 4de9c95
Author: Andrew Or <an...@gmail.com>
Authored: Tue Jan 7 12:44:22 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Tue Jan 7 12:44:22 2014 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Aggregator.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 6 +++---
.../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 292e32e..08a96b0 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -31,7 +31,7 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7dc7094..b7c7773 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,7 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency, SparkConf}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
private[spark] sealed trait CoGroupSplitDep extends Serializable
@@ -66,7 +66,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
private type CoGroupValue = (Any, Int) // Int is dependency number
private type CoGroupCombiner = Seq[CoGroup]
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
private var serializerClass: String = null
def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -122,7 +122,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
+ val ser = SparkEnv.get.serializerManager.get(serializerClass, sparkConf)
val it = fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser)
rddIterators += ((it, depNum))
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/80ba9f8b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index c348168..a5897e8 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
-import org.apache.spark.{SparkConf, Logging, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{DiskBlockManager, DiskBlockObjectWriter}
@@ -68,7 +68,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
- private val sparkConf = new SparkConf()
+ private val sparkConf = SparkEnv.get.conf
// Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = {