You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:44 UTC

[06/33] git commit: Various fixes to configuration code

Various fixes to configuration code

- Got rid of global SparkContext.globalConf
- Pass SparkConf to serializers and compression codecs
- Made SparkConf public instead of private[spark]
- Improved API of SparkContext and SparkConf
- Switched executor environment vars to be passed through SparkConf
- Fixed some places that were still using system properties
- Fixed some tests, though others are still failing

This still fails several tests in core, repl and streaming, likely due
to properties not being set or cleared correctly (some of the tests run
fine in isolation).


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/642029e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/642029e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/642029e7

Branch: refs/heads/master
Commit: 642029e7f43322f84abe4f7f36bb0b1b95d8101d
Parents: 2573add
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sat Dec 28 17:13:15 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sat Dec 28 17:13:15 2013 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |   8 +-
 .../org/apache/spark/MapOutputTracker.scala     |   4 +-
 .../scala/org/apache/spark/Partitioner.scala    |   6 +-
 .../main/scala/org/apache/spark/SparkConf.scala | 158 +++++++++++++++----
 .../scala/org/apache/spark/SparkContext.scala   | 138 ++++++++--------
 .../main/scala/org/apache/spark/SparkEnv.scala  |  11 +-
 .../spark/api/java/JavaSparkContext.scala       |  15 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   6 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  33 ++--
 .../spark/broadcast/TorrentBroadcast.scala      |  38 ++---
 .../spark/deploy/ApplicationDescription.scala   |   2 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |   7 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  14 +-
 .../apache/spark/deploy/client/TestClient.scala |   9 +-
 .../org/apache/spark/deploy/master/Master.scala |  36 +++--
 .../deploy/master/SparkZooKeeperSession.scala   |   2 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   |   2 +-
 .../master/ZooKeeperPersistenceEngine.scala     |   2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  14 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   4 +-
 .../org/apache/spark/executor/Executor.scala    |  17 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  13 +-
 .../spark/network/ConnectionManager.scala       |   4 +-
 .../org/apache/spark/network/ReceiverTest.scala |  12 +-
 .../org/apache/spark/network/SenderTest.scala   |  16 +-
 .../spark/network/netty/ShuffleCopier.scala     |   6 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   7 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   2 +-
 .../org/apache/spark/rdd/ShuffledRDD.scala      |   2 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   5 +-
 .../spark/scheduler/InputFormatInfo.scala       |  14 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |   2 +-
 .../org/apache/spark/scheduler/ResultTask.scala |   4 +-
 .../spark/scheduler/SchedulableBuilder.scala    |   2 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |   6 +-
 .../scheduler/cluster/ClusterScheduler.scala    |   8 +-
 .../cluster/ClusterTaskSetManager.scala         |  12 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   9 +-
 .../scheduler/cluster/SchedulerBackend.scala    |   3 -
 .../cluster/SimrSchedulerBackend.scala          |   2 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   2 +-
 .../scheduler/cluster/TaskResultGetter.scala    |   2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   6 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   6 +-
 .../spark/scheduler/local/LocalScheduler.scala  |   2 +-
 .../spark/serializer/JavaSerializer.scala       |   3 +-
 .../spark/serializer/KryoSerializer.scala       |  13 +-
 .../spark/serializer/SerializerManager.scala    |  12 +-
 .../spark/storage/BlockFetcherIterator.scala    |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |  46 +++---
 .../spark/storage/BlockManagerMaster.scala      |   4 +-
 .../spark/storage/BlockManagerMasterActor.scala |   4 +-
 .../apache/spark/storage/DiskBlockManager.scala |   2 +-
 .../spark/storage/ShuffleBlockManager.scala     |   9 +-
 .../spark/storage/StoragePerfTester.scala       |   2 +-
 .../apache/spark/storage/ThreadingTest.scala    |   6 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |  17 +-
 .../org/apache/spark/ui/env/EnvironmentUI.scala |   2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |   4 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  18 +--
 .../org/apache/spark/util/MetadataCleaner.scala |  33 ++--
 .../org/apache/spark/util/SizeEstimator.scala   |  17 +-
 .../scala/org/apache/spark/util/Utils.scala     |  14 +-
 .../apache/spark/io/CompressionCodecSuite.scala |   8 +-
 .../cluster/ClusterTaskSetManagerSuite.scala    |   2 +-
 .../spark/serializer/KryoSerializerSuite.scala  |  14 +-
 .../spark/storage/BlockManagerSuite.scala       |   8 +-
 .../apache/spark/util/SizeEstimatorSuite.scala  |   2 -
 .../examples/bagel/WikipediaPageRank.scala      |   4 +-
 .../bagel/WikipediaPageRankStandalone.scala     |   4 +-
 .../apache/spark/mllib/recommendation/ALS.scala |  10 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  44 +++---
 .../org/apache/spark/deploy/yarn/Client.scala   |  38 ++---
 .../spark/deploy/yarn/ClientArguments.scala     |   2 +-
 .../org/apache/spark/repl/SparkILoop.scala      |  16 +-
 .../org/apache/spark/repl/SparkIMain.scala      |   4 +-
 .../org/apache/spark/streaming/Checkpoint.scala |  22 +--
 .../org/apache/spark/streaming/DStream.scala    |   2 +-
 .../org/apache/spark/streaming/Scheduler.scala  |  10 +-
 .../spark/streaming/StreamingContext.scala      |  25 +--
 .../streaming/dstream/NetworkInputDStream.scala |   6 +-
 .../spark/streaming/util/RawTextSender.scala    |   4 +-
 .../spark/streaming/InputStreamsSuite.scala     |   6 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   6 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  56 +++----
 .../org/apache/spark/deploy/yarn/Client.scala   |  50 +++---
 .../spark/deploy/yarn/ClientArguments.scala     |   2 +-
 88 files changed, 692 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 6e922a6..5f73d23 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -41,7 +41,7 @@ class Accumulable[R, T] (
     @transient initialValue: R,
     param: AccumulableParam[R, T])
   extends Serializable {
-  
+
   val id = Accumulators.newId
   @transient private var value_ = initialValue // Current value on master
   val zero = param.zero(initialValue)  // Zero value to be passed to workers
@@ -113,7 +113,7 @@ class Accumulable[R, T] (
   def setValue(newValue: R) {
     this.value = newValue
   }
- 
+
   // Called by Java when deserializing an object
   private def readObject(in: ObjectInputStream) {
     in.defaultReadObject()
@@ -177,7 +177,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
   def zero(initialValue: R): R = {
     // We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
     // Instead we'll serialize it to a buffer and load it back.
-    val ser = new JavaSerializer().newInstance()
+    val ser = new JavaSerializer(new SparkConf(false)).newInstance()
     val copy = ser.deserialize[R](ser.serialize(initialValue))
     copy.clear()   // In case it contained stuff
     copy
@@ -215,7 +215,7 @@ private object Accumulators {
   val originals = Map[Long, Accumulable[_, _]]()
   val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
   var lastId: Long = 0
-  
+
   def newId: Long = synchronized {
     lastId += 1
     return lastId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4520edb..cdae167 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -65,7 +65,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
   protected val epochLock = new java.lang.Object
 
   private val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup)
+    new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
 
   // Send a message to the trackerActor and get its result within a default timeout, or
   // throw a SparkException if this fails.
@@ -129,7 +129,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
       if (fetchedStatuses == null) {
         // We won the race to fetch the output locs; do so
         logInfo("Doing the fetch; tracker actor = " + trackerActor)
-        val hostPort = Utils.localHostPort()
+        val hostPort = Utils.localHostPort(conf)
         // This try-finally prevents hangs due to timeouts:
         try {
           val fetchedBytes =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 04c1eed..7cb545a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -32,8 +32,6 @@ abstract class Partitioner extends Serializable {
 }
 
 object Partitioner {
-
-  import SparkContext.{globalConf => conf}
   /**
    * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
    *
@@ -54,7 +52,7 @@ object Partitioner {
     for (r <- bySize if r.partitioner != None) {
       return r.partitioner.get
     }
-    if (conf.getOrElse("spark.default.parallelism", null) != null) {
+    if (rdd.context.conf.getOrElse("spark.default.parallelism", null) != null) {
       return new HashPartitioner(rdd.context.defaultParallelism)
     } else {
       return new HashPartitioner(bySize.head.partitions.size)
@@ -92,7 +90,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
 class RangePartitioner[K <% Ordered[K]: ClassTag, V](
     partitions: Int,
     @transient rdd: RDD[_ <: Product2[K,V]],
-    private val ascending: Boolean = true) 
+    private val ascending: Boolean = true)
   extends Partitioner {
 
   // An array of upper bounds for the first (partitions - 1) partitions

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 9a4eefa..185ddb1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -1,71 +1,159 @@
 package org.apache.spark
 
-import scala.collection.JavaConversions._
-import scala.collection.concurrent.TrieMap
+import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
 
 import com.typesafe.config.ConfigFactory
 
-private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
-  @transient lazy val config = ConfigFactory.systemProperties()
-    .withFallback(ConfigFactory.parseResources("spark.conf"))
-  // TODO this should actually be synchronized
-  private val configMap = TrieMap[String, String]()
+/**
+ * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
+ *
+ * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
+ * values from both the `spark.*` Java system properties and any `spark.conf` on your application's
+ * classpath (if it has one). In this case, system properties take priority over `spark.conf`, and
+ * any parameters you set directly on the `SparkConf` object take priority over both of those.
+ *
+ * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
+ * get the same configuration no matter what is on the classpath.
+ *
+ * @param loadDefaults whether to load values from the system properties and classpath
+ */
+class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable {
 
-  if (loadClasspathRes && !config.entrySet().isEmpty) {
-    for (e <- config.entrySet()) {
-      configMap += ((e.getKey, e.getValue.unwrapped().toString))
+  /** Create a SparkConf that loads defaults from system properties and the classpath */
+  def this() = this(true)
+
+  private val settings = new HashMap[String, String]()
+
+  if (loadDefaults) {
+    val typesafeConfig = ConfigFactory.systemProperties()
+      .withFallback(ConfigFactory.parseResources("spark.conf"))
+    for (e <- typesafeConfig.entrySet().asScala) {
+      settings(e.getKey) = e.getValue.unwrapped.toString
     }
   }
 
-  def setMasterUrl(master: String) = {
-    if (master != null)
-      configMap += (("spark.master", master))
+  /** Set a configuration variable. */
+  def set(key: String, value: String): SparkConf = {
+    settings(key) = value
+    this
+  }
+
+  /**
+   * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
+   * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
+   */
+  def setMaster(master: String): SparkConf = {
+    if (master != null) {
+      settings("spark.master") = master
+    }
     this
   }
 
-  def setAppName(name: String) = {
-    if (name != null)
-      configMap += (("spark.appName", name))
+  /** Set a name for your application. Shown in the Spark web UI. */
+  def setAppName(name: String): SparkConf = {
+    if (name != null) {
+      settings("spark.appName") = name
+    }
     this
   }
 
-  def setJars(jars: Seq[String]) = {
-    if (!jars.isEmpty)
-      configMap += (("spark.jars", jars.mkString(",")))
+  /** Set JAR files to distribute to the cluster. */
+  def setJars(jars: Seq[String]): SparkConf = {
+    if (!jars.isEmpty) {
+      settings("spark.jars") = jars.mkString(",")
+    }
     this
   }
 
-  def set(k: String, value: String) = {
-    configMap += ((k, value))
+  /** Set JAR files to distribute to the cluster. (Java-friendly version.) */
+  def setJars(jars: Array[String]): SparkConf = {
+    if (!jars.isEmpty) {
+      settings("spark.jars") = jars.mkString(",")
+    }
     this
   }
 
-  def setSparkHome(home: String) = {
-    if (home != null)
-      configMap += (("spark.home", home))
+  /** Set an environment variable to be used when launching executors for this application. */
+  def setExecutorEnv(variable: String, value: String): SparkConf = {
+    settings("spark.executorEnv." + variable) = value
     this
   }
 
-  def set(map: Seq[(String, String)]) = {
-    if (map != null && !map.isEmpty)
-      configMap ++= map
+  /** Set multiple environment variables to be used when launching executors. */
+  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf = {
+    for ((k, v) <- variables) {
+      setExecutorEnv(k, v)
+    }
     this
   }
 
-  def get(k: String): String = {
-    configMap(k)
+  /**
+   * Set multiple environment variables to be used when launching executors.
+   * (Java-friendly version.)
+   */
+  def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {
+    for ((k, v) <- variables) {
+      setExecutorEnv(k, v)
+    }
+    this
   }
 
-  def getAllConfiguration = configMap.clone.entrySet().iterator
+  /**
+   * Set the location where Spark is installed on worker nodes. This is only needed on Mesos if
+   * you are not using `spark.executor.uri` to disseminate the Spark binary distribution.
+   */
+  def setSparkHome(home: String): SparkConf = {
+    if (home != null) {
+      settings("spark.home") = home
+    }
+    this
+  }
 
+  /** Set multiple parameters together */
+  def setAll(settings: Traversable[(String, String)]) = {
+    this.settings ++= settings
+    this
+  }
+
+  /** Set a parameter if it isn't already configured */
+  def setIfMissing(key: String, value: String): SparkConf = {
+    if (!settings.contains(key)) {
+      settings(key) = value
+    }
+    this
+  }
+
+  /** Get a parameter; throws an exception if it's not set */
+  def get(key: String): String = {
+    settings(key)
+  }
+
+  /** Get a parameter as an Option */
+  def getOption(key: String): Option[String] = {
+    settings.get(key)
+  }
+
+  /** Get all parameters as a list of pairs */
+  def getAll: Seq[(String, String)] = settings.clone().toSeq
+
+  /** Get a parameter, falling back to a default if not set */
   def getOrElse(k: String, defaultValue: String): String = {
-    configMap.getOrElse(k, defaultValue)
+    settings.getOrElse(k, defaultValue)
   }
 
-  override def clone: SparkConf = {
-    val conf = new SparkConf(false)
-    conf.set(configMap.toSeq)
-    conf
+  /** Get all executor environment variables set on this SparkConf */
+  def getExecutorEnv: Seq[(String, String)] = {
+    val prefix = "spark.executorEnv."
+    getAll.filter(pair => pair._1.startsWith(prefix))
+          .map(pair => (pair._1.substring(prefix.length), pair._2))
   }
 
+  /** Does the configuration contain a given parameter? */
+  def contains(key: String): Boolean = settings.contains(key)
+
+  /** Copy this object */
+  override def clone: SparkConf = {
+    new SparkConf(false).setAll(settings)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4300b07..0567f7f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,8 +22,7 @@ import java.net.URI
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.{Map, immutable}
-import scala.collection.JavaConversions._
+import scala.collection.{Map, Set, immutable}
 import scala.collection.generic.Growable
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -57,23 +56,32 @@ import org.apache.spark.util._
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
  * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  *
- * @param conf a Spark Config object describing the context configuration. Any settings in this
- *               config overrides the default configs as well as system properties.
- *
- * @param environment Environment variables to set on worker nodes.
+ * @param conf_ a Spark Config object describing the application configuration. Any settings in
+ *   this config overrides the default configs as well as system properties.
+ * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
+ *   be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
+ *   from a list of input files or InputFormats for the application.
  */
 class SparkContext(
-    val conf: SparkConf,
-    val environment: Map[String, String] = Map(),
+    conf_ : SparkConf,
     // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
-    // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
-    // of data-local splits on host
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
+    // too. This is typically generated from InputFormatInfo.computePreferredLocations. It contains
+    // a map from hostname to a list of input format splits on the host.
+    val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
   extends Logging {
 
   /**
-   * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
-   * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+   * Alternative constructor that allows setting common Spark properties directly
+   *
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
+   * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+   */
+  def this(master: String, appName: String, conf: SparkConf) =
+    this(conf.setMaster(master).setAppName(appName))
+
+  /**
+   * Alternative constructor that allows setting common Spark properties directly
    *
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI.
@@ -82,24 +90,42 @@ class SparkContext(
    *             system or HDFS, HTTP, HTTPS, or FTP URLs.
    * @param environment Environment variables to set on worker nodes.
    */
-  def this(master: String, appName: String, sparkHome: String = null,
-    jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
-    preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-    immutable.Map()) =
-    this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
-      .setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
-      environment, preferredNodeLocationData)
+  def this(
+      master: String,
+      appName: String,
+      sparkHome: String = null,
+      jars: Seq[String] = Nil,
+      environment: Map[String, String] = Map(),
+      preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
+  {
+    this(
+      new SparkConf()
+        .setMaster(master)
+        .setAppName(appName)
+        .setJars(jars)
+        .setExecutorEnv(environment.toSeq)
+        .setSparkHome(sparkHome),
+      preferredNodeLocationData)
+  }
 
-  // Set Spark driver host and port system properties
-  Try(conf.get("spark.driver.host"))
-    .getOrElse(conf.set("spark.driver.host",  Utils.localHostName()))
+  val conf = conf_.clone()
+
+  if (!conf.contains("spark.master")) {
+    throw new SparkException("A master URL must be set in your configuration")
+  }
+  if (!conf.contains("spark.appName")) {
+    throw new SparkException("An application must be set in your configuration")
+  }
 
-  Try(conf.get("spark.driver.port"))
-    .getOrElse(conf.set("spark.driver.port",  "0"))
+  // Set Spark driver host and port system properties
+  conf.setIfMissing("spark.driver.host", Utils.localHostName())
+  conf.setIfMissing("spark.driver.port", "0")
 
-  val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
-    conf.get("spark.jars").split(",")
-  } else null
+  val jars: Seq[String] = if (conf.contains("spark.jars")) {
+    conf.get("spark.jars").split(",").filter(_.size != 0)
+  } else {
+    null
+  }
 
   val master = conf.get("spark.master")
   val appName = conf.get("spark.appName")
@@ -115,8 +141,8 @@ class SparkContext(
     conf.get("spark.driver.host"),
     conf.get("spark.driver.port").toInt,
     conf,
-    true,
-    isLocal)
+    isDriver = true,
+    isLocal = isLocal)
   SparkEnv.set(env)
 
   // Used to store a URL for each static file/jar together with the file's local timestamp
@@ -125,7 +151,8 @@ class SparkContext(
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
-  private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
+  private[spark] val metadataCleaner =
+    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
 
   // Initialize the Spark UI
   private[spark] val ui = new SparkUI(this)
@@ -135,9 +162,14 @@ class SparkContext(
 
   // Add each JAR given through the constructor
   if (jars != null) {
-    jars.foreach { addJar(_) }
+    jars.foreach(addJar)
   }
 
+  private[spark] val executorMemory = conf.getOption("spark.executor.memory")
+    .orElse(Option(System.getenv("SPARK_MEM")))
+    .map(Utils.memoryStringToMb)
+    .getOrElse(512)
+
   // Environment variables to pass to our executors
   private[spark] val executorEnvs = HashMap[String, String]()
   // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
@@ -148,10 +180,8 @@ class SparkContext(
     }
   }
   // Since memory can be set with a system property too, use that
-  executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
-  if (environment != null) {
-    executorEnvs ++= environment
-  }
+  executorEnvs("SPARK_MEM") = executorMemory + "m"
+  executorEnvs ++= conf.getExecutorEnv
 
   // Set SPARK_USER for user who is running SparkContext.
   val sparkUser = Option {
@@ -183,12 +213,12 @@ class SparkContext(
       hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
     }
     // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    Utils.getSystemProperties.foreach { case (key, value) =>
+    conf.getAll.foreach { case (key, value) =>
       if (key.startsWith("spark.hadoop.")) {
         hadoopConf.set(key.substring("spark.hadoop.".length), value)
       }
     }
-    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536")
+    val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
     hadoopConf.set("io.file.buffer.size", bufferSize)
     hadoopConf
   }
@@ -200,7 +230,7 @@ class SparkContext(
     override protected def childValue(parent: Properties): Properties = new Properties(parent)
   }
 
-  private[spark] def getLocalProperties(): Properties = localProperties.get()
+  private[spark] def getLocalProperties: Properties = localProperties.get()
 
   private[spark] def setLocalProperties(props: Properties) {
     localProperties.set(props)
@@ -533,7 +563,7 @@ class SparkContext(
     // Fetch the file locally in case a job is executed locally.
     // Jobs that run through LocalScheduler will already fetch the required dependencies,
     // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
-    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
+    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory), conf)
 
     logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
   }
@@ -915,14 +945,6 @@ object SparkContext {
 
   private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
 
-  private lazy val conf = new SparkConf()
-
-  private[spark] def globalConf = {
-    if (SparkEnv.get != null) {
-      SparkEnv.get.conf
-    } else conf
-  }
-
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double) = 0.0
@@ -1031,18 +1053,10 @@ object SparkContext {
   /** Find the JAR that contains the class of a particular object */
   def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
 
-  /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
-  private[spark] val executorMemoryRequested = {
-    // TODO: Might need to add some extra memory for the non-heap parts of the JVM
-    Try(globalConf.get("spark.executor.memory")).toOption
-      .orElse(Option(System.getenv("SPARK_MEM")))
-      .map(Utils.memoryStringToMb)
-      .getOrElse(512)
-  }
-
   // Creates a task scheduler based on a given master URL. Extracted for testing.
-  private
-  def createTaskScheduler(sc: SparkContext, master: String, appName: String): TaskScheduler = {
+  private def createTaskScheduler(sc: SparkContext, master: String, appName: String)
+      : TaskScheduler =
+  {
     // Regular expression used for local[N] master format
     val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
     // Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1076,10 +1090,10 @@ object SparkContext {
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
         val memoryPerSlaveInt = memoryPerSlave.toInt
-        if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
+        if (sc.executorMemory > memoryPerSlaveInt) {
           throw new SparkException(
             "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
-              memoryPerSlaveInt, SparkContext.executorMemoryRequested))
+              memoryPerSlaveInt, sc.executorMemory))
         }
 
         val scheduler = new ClusterScheduler(sc)
@@ -1137,7 +1151,7 @@ object SparkContext {
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
         val scheduler = new ClusterScheduler(sc)
-        val coarseGrained = globalConf.getOrElse("spark.mesos.coarse",  "false").toBoolean
+        val coarseGrained = sc.conf.getOrElse("spark.mesos.coarse", "false").toBoolean
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
           new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 78e4ae2..34fad3e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -107,7 +107,7 @@ object SparkEnv extends Logging {
   /**
    * Returns the ThreadLocal SparkEnv.
    */
-  def getThreadLocal : SparkEnv = {
+  def getThreadLocal: SparkEnv = {
 	  env.get()
   }
 
@@ -150,18 +150,19 @@ object SparkEnv extends Logging {
     val serializerManager = new SerializerManager
 
     val serializer = serializerManager.setDefault(
-      conf.getOrElse("spark.serializer",  "org.apache.spark.serializer.JavaSerializer"))
+      conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
 
     val closureSerializer = serializerManager.get(
-      conf.getOrElse("spark.closure.serializer",  "org.apache.spark.serializer.JavaSerializer"))
+      conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+      conf)
 
     def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
       if (isDriver) {
         logInfo("Registering " + name)
         Left(actorSystem.actorOf(Props(newActor), name = name))
       } else {
-        val driverHost: String = conf.getOrElse("spark.driver.host",  "localhost")
-        val driverPort: Int = conf.getOrElse("spark.driver.port",  "7077").toInt
+        val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
+        val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
         val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
         logInfo("Connecting to " + name + ": " + url)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index acf328a..e03cf9d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -29,17 +29,22 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import com.google.common.base.Optional
 
-import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, SparkContext}
+import org.apache.spark._
 import org.apache.spark.SparkContext.IntAccumulatorParam
 import org.apache.spark.SparkContext.DoubleAccumulatorParam
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import scala.Tuple2
 
 /**
  * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
  * works with Java collections instead of Scala ones.
  */
 class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+  /**
+   * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+   */
+  def this(conf: SparkConf) = this(new SparkContext(conf))
 
   /**
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
@@ -50,6 +55,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
   /**
    * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
    * @param appName A name for your application, to display on the cluster web UI
+   * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
+   */
+  def this(master: String, appName: String, conf: SparkConf) =
+    this(conf.setMaster(master).setAppName(appName))
+
+  /**
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
    * @param sparkHome The SPARK_HOME directory on the slave nodes
    * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
    *                or an HDFS, HTTP, HTTPS, or FTP URL.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d6eacfe..05fd824 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
     accumulator: Accumulator[JList[Array[Byte]]])
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
+  val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
 
   override def getPartitions = parent.partitions
 
@@ -247,10 +247,10 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
  */
 private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
   extends AccumulatorParam[JList[Array[Byte]]] {
-  import SparkContext.{globalConf => conf}
+
   Utils.checkHost(serverHost, "Expected hostname")
 
-  val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
+  val bufferSize = SparkEnv.get.conf.getOrElse("spark.buffer.size", "65536").toInt
 
   override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index cecb8c2..47528bc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
 
 private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
   extends Broadcast[T](id) with Logging with Serializable {
-  
+
   def value = value_
 
   def blockId = BroadcastBlockId(id)
@@ -40,7 +40,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
     SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
   }
 
-  if (!isLocal) { 
+  if (!isLocal) {
     HttpBroadcast.write(id, value_)
   }
 
@@ -81,41 +81,48 @@ private object HttpBroadcast extends Logging {
   private var serverUri: String = null
   private var server: HttpServer = null
 
+  // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
   private val files = new TimeStampedHashSet[String]
-  private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
+  private var cleaner: MetadataCleaner = null
 
-  private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+  private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
 
-  private lazy val compressionCodec = CompressionCodec.createCodec()
+  private var compressionCodec: CompressionCodec = null
 
   def initialize(isDriver: Boolean, conf: SparkConf) {
     synchronized {
       if (!initialized) {
-        bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
-        compress = conf.getOrElse("spark.broadcast.compress",  "true").toBoolean
+        bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+        compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
         if (isDriver) {
-          createServer()
+          createServer(conf)
           conf.set("spark.httpBroadcast.uri",  serverUri)
         }
         serverUri = conf.get("spark.httpBroadcast.uri")
+        cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
+        compressionCodec = CompressionCodec.createCodec(conf)
         initialized = true
       }
     }
   }
-  
+
   def stop() {
     synchronized {
       if (server != null) {
         server.stop()
         server = null
       }
+      if (cleaner != null) {
+        cleaner.cancel()
+        cleaner = null
+      }
+      compressionCodec = null
       initialized = false
-      cleaner.cancel()
     }
   }
 
-  private def createServer() {
-    broadcastDir = Utils.createTempDir(Utils.getLocalDir)
+  private def createServer(conf: SparkConf) {
+    broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
     server = new HttpServer(broadcastDir)
     server.start()
     serverUri = server.uri
@@ -143,7 +150,7 @@ private object HttpBroadcast extends Logging {
     val in = {
       val httpConnection = new URL(url).openConnection()
       httpConnection.setReadTimeout(httpReadTimeout)
-      val inputStream = httpConnection.getInputStream()
+      val inputStream = httpConnection.getInputStream
       if (compress) {
         compressionCodec.compressedInputStream(inputStream)
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 4a3801d..00ec3b9 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -83,13 +83,13 @@ extends Broadcast[T](id) with Logging with Serializable {
         case None =>
           val start = System.nanoTime
           logInfo("Started reading broadcast variable " + id)
-          
+
           // Initialize @transient variables that will receive garbage values from the master.
           resetWorkerVariables()
 
           if (receiveBroadcast(id)) {
             value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
-            
+
             // Store the merged copy in cache so that the next worker doesn't need to rebuild it.
             // This creates a tradeoff between memory usage and latency.
             // Storing copy doubles the memory footprint; not storing doubles deserialization cost.
@@ -122,14 +122,14 @@ extends Broadcast[T](id) with Logging with Serializable {
     while (attemptId > 0 && totalBlocks == -1) {
       TorrentBroadcast.synchronized {
         SparkEnv.get.blockManager.getSingle(metaId) match {
-          case Some(x) => 
+          case Some(x) =>
             val tInfo = x.asInstanceOf[TorrentInfo]
             totalBlocks = tInfo.totalBlocks
             totalBytes = tInfo.totalBytes
             arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
             hasBlocks = 0
-          
-          case None => 
+
+          case None =>
             Thread.sleep(500)
         }
       }
@@ -145,13 +145,13 @@ extends Broadcast[T](id) with Logging with Serializable {
       val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid)
       TorrentBroadcast.synchronized {
         SparkEnv.get.blockManager.getSingle(pieceId) match {
-          case Some(x) => 
+          case Some(x) =>
             arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
             hasBlocks += 1
             SparkEnv.get.blockManager.putSingle(
               pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, true)
-          
-          case None => 
+
+          case None =>
             throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
         }
       }
@@ -175,13 +175,13 @@ extends Logging {
       }
     }
   }
-  
+
   def stop() {
     initialized = false
   }
 
-  lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize",  "4096").toInt * 1024
-  
+  lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
+
   def blockifyObject[T](obj: T): TorrentInfo = {
     val byteArray = Utils.serialize[T](obj)
     val bais = new ByteArrayInputStream(byteArray)
@@ -210,7 +210,7 @@ extends Logging {
   }
 
   def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
-                            totalBytes: Int, 
+                            totalBytes: Int,
                             totalBlocks: Int): T = {
     var retByteArray = new Array[Byte](totalBytes)
     for (i <- 0 until totalBlocks) {
@@ -223,22 +223,22 @@ extends Logging {
 }
 
 private[spark] case class TorrentBlock(
-    blockID: Int, 
-    byteArray: Array[Byte]) 
+    blockID: Int,
+    byteArray: Array[Byte])
   extends Serializable
 
 private[spark] case class TorrentInfo(
     @transient arrayOfBlocks : Array[TorrentBlock],
-    totalBlocks: Int, 
-    totalBytes: Int) 
+    totalBlocks: Int,
+    totalBytes: Int)
   extends Serializable {
-  
-  @transient var hasBlocks = 0 
+
+  @transient var hasBlocks = 0
 }
 
 private[spark] class TorrentBroadcastFactory
   extends BroadcastFactory {
-  
+
   def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
 
   def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index dda43dc..19d393a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -26,7 +26,7 @@ private[spark] class ApplicationDescription(
     val appUiUrl: String)
   extends Serializable {
 
-  val user = System.getProperty("user.name",  "<unknown>")
+  val user = System.getProperty("user.name", "<unknown>")
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 59d12a3..ffc0cb0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.util.Utils
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -43,7 +43,8 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
     logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
 
     /* Start the Master */
-    val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
+    val conf = new SparkConf(false)
+    val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
     masterActorSystems += masterSystem
     val masterUrl = "spark://" + localHostname + ":" + masterPort
     val masters = Array(masterUrl)
@@ -55,7 +56,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
       workerActorSystems += workerSystem
     }
 
-    return masters
+    masters
   }
 
   def stop() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 1c979ac..4f402c1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -34,10 +34,10 @@ class SparkHadoopUtil {
   UserGroupInformation.setConfiguration(conf)
 
   def runAsUser(user: String)(func: () => Unit) {
-    // if we are already running as the user intended there is no reason to do the doAs. It 
+    // if we are already running as the user intended there is no reason to do the doAs. It
     // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
-    // the user is UNKNOWN then we shouldn't be creating a remote unknown user 
-    // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only 
+    // the user is UNKNOWN then we shouldn't be creating a remote unknown user
+    // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
     // in SparkContext.
     val currentUser = Option(System.getProperty("user.name")).
       getOrElse(SparkContext.SPARK_UNKNOWN_USER)
@@ -67,12 +67,14 @@ class SparkHadoopUtil {
 }
 
 object SparkHadoopUtil {
-  import SparkContext.{globalConf => conf}
+
   private val hadoop = {
-    val yarnMode = java.lang.Boolean.valueOf(conf.getOrElse("SPARK_YARN_MODE",  System.getenv("SPARK_YARN_MODE")))
+    val yarnMode = java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))
     if (yarnMode) {
       try {
-        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+          .newInstance()
+          .asInstanceOf[SparkHadoopUtil]
       } catch {
        case th: Throwable => throw new SparkException("Unable to load YARN support", th)
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 426cf52..ef649fd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.client
 
 import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
 
 private[spark] object TestClient {
@@ -46,11 +46,12 @@ private[spark] object TestClient {
   def main(args: Array[String]) {
     val url = args(0)
     val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
-      conf = SparkContext.globalConf)
+      conf = new SparkConf)
     val desc = new ApplicationDescription(
-      "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
+      "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
+      "dummy-spark-home", "ignored")
     val listener = new TestListener
-    val client = new Client(actorSystem, Array(url), desc, listener, SparkContext.globalConf)
+    val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf)
     client.start()
     actorSystem.awaitTermination()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2c162c4..9c89e36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
 
-import org.apache.spark.{SparkContext, Logging, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.MasterMessages._
@@ -38,14 +38,16 @@ import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
-  import context.dispatcher
-  val conf = SparkContext.globalConf
+  import context.dispatcher   // to use Akka's scheduler.schedule()
+
+  val conf = new SparkConf
+
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout",  "60").toLong * 1000
-  val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications",  "200").toInt
-  val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence",  "15").toInt
-  val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory",  "")
-  val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode",  "NONE")
+  val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
+  val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
+  val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
+  val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
+  val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
 
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
@@ -86,7 +88,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut",  "true").toBoolean
+  val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
 
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
@@ -495,7 +497,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         removeWorker(worker)
       } else {
         if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
-          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 
+          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
       }
     }
   }
@@ -507,8 +509,9 @@ private[spark] object Master {
   val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
 
   def main(argStrings: Array[String]) {
-    val args = new MasterArguments(argStrings, SparkContext.globalConf)
-    val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+    val conf = new SparkConf
+    val args = new MasterArguments(argStrings, conf)
+    val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
     actorSystem.awaitTermination()
   }
 
@@ -522,11 +525,12 @@ private[spark] object Master {
     }
   }
 
-  def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
-      conf = SparkContext.globalConf)
+  def startSystemAndActor(host: String, port: Int, webUiPort: Int, conf: SparkConf)
+      : (ActorSystem, Int, Int) =
+  {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf)
     val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
-    val timeout = AkkaUtils.askTimeout(SparkContext.globalConf)
+    val timeout = AkkaUtils.askTimeout(conf)
     val respFuture = actor.ask(RequestWebUIPort)(timeout)
     val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
     (actorSystem, boundPort, resp.webUIBoundPort)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 79d95b1..60c7a7c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -37,7 +37,7 @@ import org.apache.spark.{SparkConf, Logging}
  */
 private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
     conf: SparkConf) extends Logging {
-  val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url",  "")
+  val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
 
   val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
   val ZK_TIMEOUT_MILLIS = 30000

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index df5bb36..a61597b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -28,7 +28,7 @@ private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
     masterUrl: String, conf: SparkConf)
   extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging  {
 
-  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir",  "/spark") + "/leader_election"
+  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
 
   private val watcher = new ZooKeeperWatcher()
   private val zk = new SparkZooKeeperSession(this, conf)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index c55b720..245a558 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -27,7 +27,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   with SparkZooKeeperWatcher
   with Logging
 {
-  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir",  "/spark") + "/master_status"
+  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
 
   val zk = new SparkZooKeeperSession(this, conf)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 75a6e75..f844fcb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[spark] class Worker(
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout",  "60").toLong * 1000 / 4
+  val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
@@ -267,7 +267,7 @@ private[spark] class Worker(
 }
 
 private[spark] object Worker {
-  import org.apache.spark.SparkContext.globalConf
+
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -276,14 +276,16 @@ private[spark] object Worker {
   }
 
   def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
-    masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
-    : (ActorSystem, Int) = {
+      masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+      : (ActorSystem, Int) =
+  {
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+    val conf = new SparkConf
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
-      conf = globalConf)
+      conf = conf)
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-      masterUrls, workDir, globalConf), name = "Worker")
+      masterUrls, workDir, conf), name = "Worker")
     (actorSystem, boundPort)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c8319f6..53a2b94 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import akka.actor._
 import akka.remote._
 
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,7 +98,7 @@ private[spark] object CoarseGrainedExecutorBackend {
     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
     // before getting started with all our system properties, etc
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-      indestructible = true, conf = SparkContext.globalConf)
+      indestructible = true, conf = new SparkConf)
     // set it
     val sparkHostPort = hostname + ":" + boundPort
 //    conf.set("spark.hostPort",  sparkHostPort)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 70fc30e..a6eabc4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -57,17 +57,18 @@ private[spark] class Executor(
 
   // Make sure the local hostname we report matches the cluster scheduler's name for this host
   Utils.setCustomHostname(slaveHostname)
+
+  // Set spark.* properties from executor arg
   val conf = new SparkConf(false)
-  // Set spark.* system properties from executor arg
-  for ((key, value) <- properties) {
-    conf.set(key,  value)
-  }
+  conf.setAll(properties)
 
   // If we are in yarn mode, systems can have different disk layouts so we must set it
   // to what Yarn on this system said was available. This will be used later when SparkEnv
   // created.
-  if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
-    conf.set("spark.local.dir",  getYarnLocalDirs())
+  if (java.lang.Boolean.valueOf(
+      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
+  {
+    conf.set("spark.local.dir", getYarnLocalDirs())
   }
 
   // Create our ClassLoader and set it on this thread
@@ -331,12 +332,12 @@ private[spark] class Executor(
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
         currentFiles(name) = timestamp
       }
       for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
         logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
+        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf)
         currentJars(name) = timestamp
         // Add it to our class loader
         val localName = name.split("/").last

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 8ef5019..2040268 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,7 +22,7 @@ import java.io.{InputStream, OutputStream}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 
 import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkEnv, SparkConf}
 
 
 /**
@@ -38,16 +38,15 @@ trait CompressionCodec {
 
 
 private[spark] object CompressionCodec {
-  import org.apache.spark.SparkContext.globalConf
-  def createCodec(): CompressionCodec = {
-    createCodec(System.getProperty(
+  def createCodec(conf: SparkConf): CompressionCodec = {
+    createCodec(conf, conf.getOrElse(
       "spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
   }
 
-  def createCodec(codecName: String): CompressionCodec = {
+  def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
     val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
       .getConstructor(classOf[SparkConf])
-      ctor.newInstance(globalConf).asInstanceOf[CompressionCodec]
+      ctor.newInstance(conf).asInstanceOf[CompressionCodec]
   }
 }
 
@@ -72,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size",  "32768").toInt
+    val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
     new SnappyOutputStream(s, blockSize)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 3e902f8..697096f 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -593,10 +593,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 
 private[spark] object ConnectionManager {
 
-  import SparkContext.globalConf
-
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999, globalConf)
+    val manager = new ConnectionManager(9999, new SparkConf)
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       println("Received [" + msg + "] from [" + id + "]")
       None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 4ca3cd3..1c9d603 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -19,19 +19,19 @@ package org.apache.spark.network
 
 import java.nio.ByteBuffer
 import java.net.InetAddress
+import org.apache.spark.SparkConf
 
 private[spark] object ReceiverTest {
-  import org.apache.spark.SparkContext.globalConf
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999, globalConf)
+    val manager = new ConnectionManager(9999, new SparkConf)
     println("Started connection manager with id = " + manager.id)
-    
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
-      val buffer = ByteBuffer.wrap("response".getBytes())
+      val buffer = ByteBuffer.wrap("response".getBytes)
       Some(Message.createBufferMessage(buffer, msg.id))
     })
-    Thread.currentThread.join()  
+    Thread.currentThread.join()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 11c21fc..dcbd183 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -19,29 +19,29 @@ package org.apache.spark.network
 
 import java.nio.ByteBuffer
 import java.net.InetAddress
+import org.apache.spark.SparkConf
 
 private[spark] object SenderTest {
-  import org.apache.spark.SparkContext.globalConf
   def main(args: Array[String]) {
-    
+
     if (args.length < 2) {
       println("Usage: SenderTest <target host> <target port>")
       System.exit(1)
     }
-   
+
     val targetHost = args(0)
     val targetPort = args(1).toInt
     val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
 
-    val manager = new ConnectionManager(0, globalConf)
+    val manager = new ConnectionManager(0, new SparkConf)
     println("Started connection manager with id = " + manager.id)
 
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       println("Received [" + msg + "] from [" + id + "]")
       None
     })
-  
-    val size =  100 * 1024  * 1024 
+
+    val size =  100 * 1024  * 1024
     val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
     buffer.flip
 
@@ -50,7 +50,7 @@ private[spark] object SenderTest {
     val count = 100
     (0 until count).foreach(i => {
       val dataMessage = Message.createBufferMessage(buffer.duplicate)
-      val startTime = System.currentTimeMillis  
+      val startTime = System.currentTimeMillis
       /*println("Started timer at " + startTime)*/
       val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
         case Some(response) =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index 81b3104..db28ddf 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
       resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout",  "60000").toInt
+    val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
     val fc = new FileClient(handler, connectTimeout)
 
     try {
@@ -104,10 +104,10 @@ private[spark] object ShuffleCopier extends Logging {
     val threads = if (args.length > 3) args(3).toInt else 10
 
     val copiers = Executors.newFixedThreadPool(80)
-    val tasks = (for (i <- Range(0, threads)) yield { 
+    val tasks = (for (i <- Range(0, threads)) yield {
       Executors.callable(new Runnable() {
         def run() {
-          val copier = new ShuffleCopier(SparkContext.globalConf)
+          val copier = new ShuffleCopier(new SparkConf)
           copier.getBlock(host, port, blockId, echoResultCollectCallBack)
         }
       })

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 9fbe002..2897c4b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -74,9 +74,6 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
 }
 
 private[spark] object CheckpointRDD extends Logging {
-
-  import SparkContext.{globalConf => conf}
-
   def splitIdToFile(splitId: Int): String = {
     "part-%05d".format(splitId)
   }
@@ -94,7 +91,7 @@ private[spark] object CheckpointRDD extends Logging {
       throw new IOException("Checkpoint failed: temporary path " +
         tempOutputPath + " already exists")
     }
-    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
+    val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
 
     val fileOutputStream = if (blockSize < 0) {
       fs.create(tempOutputPath, false, bufferSize)
@@ -124,7 +121,7 @@ private[spark] object CheckpointRDD extends Logging {
   def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
+    val bufferSize = env.conf.getOrElse("spark.buffer.size", "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
     val deserializeStream = serializer.deserializeStream(fileInputStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 911a002..4ba4696 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       map.changeValue(k, update)
     }
 
-    val ser = SparkEnv.get.serializerManager.get(serializerClass)
+    val ser = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 3682c84..0ccb309 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -59,7 +59,7 @@ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
   override def compute(split: Partition, context: TaskContext): Iterator[P] = {
     val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
     SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context,
-      SparkEnv.get.serializerManager.get(serializerClass))
+      SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf))
   }
 
   override def clearDependencies() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index aab30b1..4f90c7d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -93,7 +93,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
 
   override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
     val partition = p.asInstanceOf[CoGroupPartition]
-    val serializer = SparkEnv.get.serializerManager.get(serializerClass)
+    val serializer = SparkEnv.get.serializerManager.get(serializerClass, SparkEnv.get.conf)
     val map = new JHashMap[K, ArrayBuffer[V]]
     def getSeq(k: K): ArrayBuffer[V] = {
       val seq = map.get(k)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 963d15b..77aa24e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -158,7 +158,8 @@ class DAGScheduler(
   val activeJobs = new HashSet[ActiveJob]
   val resultStageToJob = new HashMap[Stage, ActiveJob]
 
-  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+  val metadataCleaner = new MetadataCleaner(
+    MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
 
   /**
    * Starts the event processing actor.  The actor has two responsibilities:
@@ -529,7 +530,7 @@ class DAGScheduler(
       case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
         var finalStage: Stage = null
         try {
-          // New stage creation at times and if its not protected, the scheduler thread is killed. 
+          // New stage creation at times and if its not protected, the scheduler thread is killed.
           // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
           finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
         } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 1791ee6..90eb8a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
 /**
  * Parses and holds information about inputFormat (and files) specified as a parameter.
  */
-class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_], 
+class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
                       val path: String) extends Logging {
 
   var mapreduceInputFormat: Boolean = false
@@ -40,7 +40,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
 
   validate()
 
-  override def toString(): String = {
+  override def toString: String = {
     "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
   }
 
@@ -125,7 +125,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
    }
 
   private def findPreferredLocations(): Set[SplitInfo] = {
-    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + 
+    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
       ", inputFormatClazz : " + inputFormatClazz)
     if (mapreduceInputFormat) {
       return prefLocsFromMapreduceInputFormat()
@@ -143,14 +143,14 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
 object InputFormatInfo {
   /**
     Computes the preferred locations based on input(s) and returned a location to block map.
-    Typical use of this method for allocation would follow some algo like this 
-    (which is what we currently do in YARN branch) :
+    Typical use of this method for allocation would follow some algo like this:
+
     a) For each host, count number of splits hosted on that host.
     b) Decrement the currently allocated containers on that host.
     c) Compute rack info for each host and update rack -> count map based on (b).
     d) Allocate nodes based on (c)
-    e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node 
-       (even if data locality on that is very high) : this is to prevent fragility of job if a single 
+    e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
+       (even if data locality on that is very high) : this is to prevent fragility of job if a single
        (or small set of) hosts go down.
 
     go to (a) until required nodes are allocated.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 3f55cd5..6092783 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
 class JobLogger(val user: String, val logDirName: String)
   extends SparkListener with Logging {
 
-  def this() = this(System.getProperty("user.name",  "<unknown>"),
+  def this() = this(System.getProperty("user.name", "<unknown>"),
     String.valueOf(System.currentTimeMillis()))
 
   private val logDir =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 310ec62..28f3ba5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -32,7 +32,9 @@ private[spark] object ResultTask {
   // expensive on the master node if it needs to launch thousands of tasks.
   val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
 
-  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues)
+  // TODO: This object shouldn't have global variables
+  val metadataCleaner = new MetadataCleaner(
+    MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)
 
   def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
     synchronized {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 9002d33..3cf995e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -52,7 +52,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = Option(conf.get("spark.scheduler.allocation.file"))
+  val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
   val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
   val DEFAULT_POOL_NAME = "default"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 0f2deb4..a37ead5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -37,7 +37,9 @@ private[spark] object ShuffleMapTask {
   // expensive on the master node if it needs to launch thousands of tasks.
   val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
 
-  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues)
+  // TODO: This object shouldn't have global variables
+  val metadataCleaner = new MetadataCleaner(
+    MetadataCleanerType.SHUFFLE_MAP_TASK, serializedInfoCache.clearOldValues, new SparkConf)
 
   def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
     synchronized {
@@ -152,7 +154,7 @@ private[spark] class ShuffleMapTask(
 
     try {
       // Obtain all the block writers for shuffle blocks.
-      val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
+      val ser = SparkEnv.get.serializerManager.get(dep.serializerClass, SparkEnv.get.conf)
       shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
 
       // Write the map output to its associated buckets.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 7e231ec..2707740 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -51,10 +51,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 {
   val conf = sc.conf
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval",  "100").toLong
+  val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout",  "15000").toLong
+  val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
 
   // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
@@ -91,7 +91,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   var rootPool: Pool = null
   // default scheduler is FIFO
   val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    conf.getOrElse("spark.scheduler.mode",  "FIFO"))
+    conf.getOrElse("spark.scheduler.mode", "FIFO"))
 
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -120,7 +120,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   override def start() {
     backend.start()
 
-    if (conf.getOrElse("spark.speculation",  "false").toBoolean) {
+    if (conf.getOrElse("spark.speculation", "false").toBoolean) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,