You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:41 UTC

[03/33] git commit: spark-544, introducing SparkConf and related configuration overhaul.

spark-544, introducing SparkConf and related configuration overhaul.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2573add9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2573add9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2573add9

Branch: refs/heads/master
Commit: 2573add94cf920a88f74d80d8ea94218d812704d
Parents: 0bc57c5
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Dec 24 18:30:31 2013 +0530
Committer: Prashant Sharma <sc...@gmail.com>
Committed: Wed Dec 25 00:09:36 2013 +0530

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |   7 +-
 .../scala/org/apache/spark/Partitioner.scala    |   4 +-
 .../main/scala/org/apache/spark/SparkConf.scala |  71 ++++++++++
 .../scala/org/apache/spark/SparkContext.scala   | 140 ++++++++++---------
 .../main/scala/org/apache/spark/SparkEnv.scala  |  44 +++---
 .../org/apache/spark/api/python/PythonRDD.scala |   6 +-
 .../org/apache/spark/broadcast/Broadcast.scala  |   6 +-
 .../spark/broadcast/BroadcastFactory.scala      |   4 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  14 +-
 .../spark/broadcast/TorrentBroadcast.scala      |   9 +-
 .../spark/deploy/ApplicationDescription.scala   |   2 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |   3 +-
 .../org/apache/spark/deploy/client/Client.scala |  13 +-
 .../apache/spark/deploy/client/TestClient.scala |   7 +-
 .../org/apache/spark/deploy/master/Master.scala |  31 ++--
 .../spark/deploy/master/MasterArguments.scala   |   7 +-
 .../deploy/master/SparkZooKeeperSession.scala   |   7 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   |   9 +-
 .../master/ZooKeeperPersistenceEngine.scala     |   8 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |   2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  28 ++--
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   6 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   6 +-
 .../org/apache/spark/executor/Executor.scala    |  10 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  14 +-
 .../apache/spark/metrics/MetricsSystem.scala    |  10 +-
 .../spark/network/ConnectionManager.scala       |  24 ++--
 .../org/apache/spark/network/ReceiverTest.scala |   4 +-
 .../org/apache/spark/network/SenderTest.scala   |   4 +-
 .../spark/network/netty/ShuffleCopier.scala     |   8 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   6 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   1 +
 .../org/apache/spark/scheduler/JobLogger.scala  |   2 +-
 .../spark/scheduler/SchedulableBuilder.scala    |   6 +-
 .../scheduler/cluster/ClusterScheduler.scala    |  11 +-
 .../cluster/ClusterTaskSetManager.scala         |  19 +--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  12 +-
 .../cluster/SimrSchedulerBackend.scala          |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   6 +-
 .../scheduler/cluster/TaskResultGetter.scala    |   3 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  10 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   4 +-
 .../spark/scheduler/local/LocalScheduler.scala  |   5 +-
 .../spark/serializer/KryoSerializer.scala       |  10 +-
 .../spark/storage/BlockFetcherIterator.scala    |   4 +-
 .../org/apache/spark/storage/BlockManager.scala |  38 ++---
 .../spark/storage/BlockManagerMaster.scala      |  11 +-
 .../spark/storage/BlockManagerMasterActor.scala |  10 +-
 .../spark/storage/BlockObjectWriter.scala       |   5 +-
 .../apache/spark/storage/DiskBlockManager.scala |   2 +-
 .../spark/storage/ShuffleBlockManager.scala     |   7 +-
 .../apache/spark/storage/ThreadingTest.scala    |   6 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |   2 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |   4 +-
 .../org/apache/spark/ui/env/EnvironmentUI.scala |   2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |   2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  25 ++--
 .../org/apache/spark/util/MetadataCleaner.scala |  12 +-
 .../org/apache/spark/util/SizeEstimator.scala   |   7 +-
 .../scala/org/apache/spark/util/Utils.scala     |   7 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  16 +--
 .../spark/metrics/MetricsSystemSuite.scala      |   8 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  23 ++-
 .../apache/spark/scheduler/JobLoggerSuite.scala |   2 +-
 .../cluster/ClusterSchedulerSuite.scala         |   2 +-
 .../cluster/ClusterTaskSetManagerSuite.scala    |   4 +-
 .../cluster/TaskResultGetterSuite.scala         |   2 +-
 .../spark/storage/BlockManagerSuite.scala       |  95 +++++++------
 .../spark/storage/DiskBlockManagerSuite.scala   |  12 +-
 .../apache/spark/util/SizeEstimatorSuite.scala  |   4 +-
 .../examples/bagel/WikipediaPageRank.scala      |  10 +-
 .../bagel/WikipediaPageRankStandalone.scala     |   8 +-
 .../streaming/examples/ActorWordCount.scala     |   3 +-
 .../apache/spark/mllib/recommendation/ALS.scala |  13 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  16 +--
 .../org/apache/spark/deploy/yarn/Client.scala   |   4 +-
 .../spark/deploy/yarn/ClientArguments.scala     |   2 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      |   4 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |   4 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   4 +-
 project/SparkBuild.scala                        |   3 +-
 .../org/apache/spark/repl/SparkILoop.scala      |   7 +-
 .../org/apache/spark/repl/SparkIMain.scala      |   7 +-
 .../org/apache/spark/streaming/Checkpoint.scala |   3 +-
 .../org/apache/spark/streaming/Scheduler.scala  |   6 +-
 .../spark/streaming/StreamingContext.scala      |   2 +-
 .../streaming/dstream/NetworkInputDStream.scala |   6 +-
 .../spark/streaming/CheckpointSuite.scala       |   6 +-
 .../spark/streaming/InputStreamsSuite.scala     |  18 +--
 .../apache/spark/streaming/TestSuiteBase.scala  |  11 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  16 +--
 .../org/apache/spark/deploy/yarn/Client.scala   |   6 +-
 .../spark/deploy/yarn/ClientArguments.scala     |   2 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      |   4 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |   2 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   4 +-
 96 files changed, 612 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ccffcc3..4520edb 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -50,9 +50,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
   }
 }
 
-private[spark] class MapOutputTracker extends Logging {
+private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
 
-  private val timeout = AkkaUtils.askTimeout
+  private val timeout = AkkaUtils.askTimeout(conf)
 
   // Set to the MapOutputTrackerActor living on the driver
   var trackerActor: Either[ActorRef, ActorSelection] = _
@@ -192,7 +192,8 @@ private[spark] class MapOutputTracker extends Logging {
   }
 }
 
-private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
+private[spark] class MapOutputTrackerMaster(conf: SparkConf)
+  extends MapOutputTracker(conf) {
 
   // Cache a serialized version of the output statuses for each shuffle to send them out faster
   private var cacheEpoch = epoch

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index bcec41c..04c1eed 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -32,6 +32,8 @@ abstract class Partitioner extends Serializable {
 }
 
 object Partitioner {
+
+  import SparkContext.{globalConf => conf}
   /**
    * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
    *
@@ -52,7 +54,7 @@ object Partitioner {
     for (r <- bySize if r.partitioner != None) {
       return r.partitioner.get
     }
-    if (System.getProperty("spark.default.parallelism") != null) {
+    if (conf.getOrElse("spark.default.parallelism", null) != null) {
       return new HashPartitioner(rdd.context.defaultParallelism)
     } else {
       return new HashPartitioner(bySize.head.partitions.size)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
new file mode 100644
index 0000000..9a4eefa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -0,0 +1,71 @@
+package org.apache.spark
+
+import scala.collection.JavaConversions._
+import scala.collection.concurrent.TrieMap
+
+import com.typesafe.config.ConfigFactory
+
+private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
+  @transient lazy val config = ConfigFactory.systemProperties()
+    .withFallback(ConfigFactory.parseResources("spark.conf"))
+  // TODO this should actually be synchronized
+  private val configMap = TrieMap[String, String]()
+
+  if (loadClasspathRes && !config.entrySet().isEmpty) {
+    for (e <- config.entrySet()) {
+      configMap += ((e.getKey, e.getValue.unwrapped().toString))
+    }
+  }
+
+  def setMasterUrl(master: String) = {
+    if (master != null)
+      configMap += (("spark.master", master))
+    this
+  }
+
+  def setAppName(name: String) = {
+    if (name != null)
+      configMap += (("spark.appName", name))
+    this
+  }
+
+  def setJars(jars: Seq[String]) = {
+    if (!jars.isEmpty)
+      configMap += (("spark.jars", jars.mkString(",")))
+    this
+  }
+
+  def set(k: String, value: String) = {
+    configMap += ((k, value))
+    this
+  }
+
+  def setSparkHome(home: String) = {
+    if (home != null)
+      configMap += (("spark.home", home))
+    this
+  }
+
+  def set(map: Seq[(String, String)]) = {
+    if (map != null && !map.isEmpty)
+      configMap ++= map
+    this
+  }
+
+  def get(k: String): String = {
+    configMap(k)
+  }
+
+  def getAllConfiguration = configMap.clone.entrySet().iterator
+
+  def getOrElse(k: String, defaultValue: String): String = {
+    configMap.getOrElse(k, defaultValue)
+  }
+
+  override def clone: SparkConf = {
+    val conf = new SparkConf(false)
+    conf.set(configMap.toSeq)
+    conf
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a0f794e..4300b07 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,91 +22,99 @@ import java.net.URI
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.Map
+import scala.collection.{Map, immutable}
+import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.reflect.{ClassTag, classTag}
+import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
+FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
+TextInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
 import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
-  SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend,
+SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend,
+MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
-  TimeStampedHashMap, Utils}
+import org.apache.spark.util._
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
  * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  *
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param conf a Spark Config object describing the context configuration. Any settings in this
+ *               config overrides the default configs as well as system properties.
+ *
  * @param environment Environment variables to set on worker nodes.
  */
 class SparkContext(
-    val master: String,
-    val appName: String,
-    val sparkHome: String = null,
-    val jars: Seq[String] = Nil,
+    val conf: SparkConf,
     val environment: Map[String, String] = Map(),
     // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
     // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
     // of data-local splits on host
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
-      scala.collection.immutable.Map())
+    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
   extends Logging {
 
-  // Ensure logging is initialized before we spawn any threads
-  initLogging()
+  /**
+   * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
+   * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+   *
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI.
+   * @param sparkHome Location where Spark is installed on cluster nodes.
+   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   * @param environment Environment variables to set on worker nodes.
+   */
+  def this(master: String, appName: String, sparkHome: String = null,
+    jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
+    preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+    immutable.Map()) =
+    this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
+      .setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
+      environment, preferredNodeLocationData)
 
   // Set Spark driver host and port system properties
-  if (System.getProperty("spark.driver.host") == null) {
-    System.setProperty("spark.driver.host", Utils.localHostName())
-  }
-  if (System.getProperty("spark.driver.port") == null) {
-    System.setProperty("spark.driver.port", "0")
-  }
+  Try(conf.get("spark.driver.host"))
+    .getOrElse(conf.set("spark.driver.host",  Utils.localHostName()))
+
+  Try(conf.get("spark.driver.port"))
+    .getOrElse(conf.set("spark.driver.port",  "0"))
+
+  val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
+    conf.get("spark.jars").split(",")
+  } else null
+
+  val master = conf.get("spark.master")
+  val appName = conf.get("spark.appName")
 
   val isLocal = (master == "local" || master.startsWith("local["))
 
+  // Ensure logging is initialized before we spawn any threads
+  initLogging()
+
   // Create the Spark execution environment (cache, map output tracker, etc)
   private[spark] val env = SparkEnv.createFromSystemProperties(
     "<driver>",
-    System.getProperty("spark.driver.host"),
-    System.getProperty("spark.driver.port").toInt,
+    conf.get("spark.driver.host"),
+    conf.get("spark.driver.port").toInt,
+    conf,
     true,
     isLocal)
   SparkEnv.set(env)
@@ -165,24 +173,24 @@ class SparkContext(
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration = {
     val env = SparkEnv.get
-    val conf = SparkHadoopUtil.get.newConfiguration()
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
     // Explicitly check for S3 environment variables
     if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
         System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-      conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-      conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+      hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+      hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
     }
     // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
     Utils.getSystemProperties.foreach { case (key, value) =>
       if (key.startsWith("spark.hadoop.")) {
-        conf.set(key.substring("spark.hadoop.".length), value)
+        hadoopConf.set(key.substring("spark.hadoop.".length), value)
       }
     }
-    val bufferSize = System.getProperty("spark.buffer.size", "65536")
-    conf.set("io.file.buffer.size", bufferSize)
-    conf
+    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536")
+    hadoopConf.set("io.file.buffer.size", bufferSize)
+    hadoopConf
   }
 
   private[spark] var checkpointDir: Option[String] = None
@@ -695,10 +703,8 @@ class SparkContext(
    * (in that order of preference). If neither of these is set, return None.
    */
   private[spark] def getSparkHome(): Option[String] = {
-    if (sparkHome != null) {
-      Some(sparkHome)
-    } else if (System.getProperty("spark.home") != null) {
-      Some(System.getProperty("spark.home"))
+    if (conf.getOrElse("spark.home", null) != null) {
+      Some(conf.get("spark.home"))
     } else if (System.getenv("SPARK_HOME") != null) {
       Some(System.getenv("SPARK_HOME"))
     } else {
@@ -909,6 +915,14 @@ object SparkContext {
 
   private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
 
+  private lazy val conf = new SparkConf()
+
+  private[spark] def globalConf = {
+    if (SparkEnv.get != null) {
+      SparkEnv.get.conf
+    } else conf
+  }
+
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double) = 0.0
@@ -1020,7 +1034,7 @@ object SparkContext {
   /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
   private[spark] val executorMemoryRequested = {
     // TODO: Might need to add some extra memory for the non-heap parts of the JVM
-    Option(System.getProperty("spark.executor.memory"))
+    Try(globalConf.get("spark.executor.memory")).toOption
       .orElse(Option(System.getenv("SPARK_MEM")))
       .map(Utils.memoryStringToMb)
       .getOrElse(512)
@@ -1123,7 +1137,7 @@ object SparkContext {
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
         val scheduler = new ClusterScheduler(sc)
-        val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+        val coarseGrained = globalConf.getOrElse("spark.mesos.coarse",  "false").toBoolean
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
           new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 826f5c2..78e4ae2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -54,7 +54,8 @@ class SparkEnv (
     val connectionManager: ConnectionManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
-    val metricsSystem: MetricsSystem) {
+    val metricsSystem: MetricsSystem,
+    val conf: SparkConf) {
 
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
@@ -114,25 +115,27 @@ object SparkEnv extends Logging {
       executorId: String,
       hostname: String,
       port: Int,
+      conf: SparkConf,
       isDriver: Boolean,
       isLocal: Boolean): SparkEnv = {
 
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,
+      conf = conf)
 
     // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
     // figure out which port number Akka actually bound to and set spark.driver.port to it.
     if (isDriver && port == 0) {
-      System.setProperty("spark.driver.port", boundPort.toString)
+      conf.set("spark.driver.port",  boundPort.toString)
     }
 
     // set only if unset until now.
-    if (System.getProperty("spark.hostPort", null) == null) {
+    if (conf.getOrElse("spark.hostPort",  null) == null) {
       if (!isDriver){
         // unexpected
         Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
       }
       Utils.checkHost(hostname)
-      System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+      conf.set("spark.hostPort",  hostname + ":" + boundPort)
     }
 
     val classLoader = Thread.currentThread.getContextClassLoader
@@ -140,25 +143,25 @@ object SparkEnv extends Logging {
     // Create an instance of the class named by the given Java system property, or by
     // defaultClassName if the property is not set, and return it as a T
     def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
-      val name = System.getProperty(propertyName, defaultClassName)
+      val name = conf.getOrElse(propertyName,  defaultClassName)
       Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
     }
 
     val serializerManager = new SerializerManager
 
     val serializer = serializerManager.setDefault(
-      System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+      conf.getOrElse("spark.serializer",  "org.apache.spark.serializer.JavaSerializer"))
 
     val closureSerializer = serializerManager.get(
-      System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+      conf.getOrElse("spark.closure.serializer",  "org.apache.spark.serializer.JavaSerializer"))
 
     def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
       if (isDriver) {
         logInfo("Registering " + name)
         Left(actorSystem.actorOf(Props(newActor), name = name))
       } else {
-        val driverHost: String = System.getProperty("spark.driver.host", "localhost")
-        val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+        val driverHost: String = conf.getOrElse("spark.driver.host",  "localhost")
+        val driverPort: Int = conf.getOrElse("spark.driver.port",  "7077").toInt
         Utils.checkHost(driverHost, "Expected hostname")
         val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
         logInfo("Connecting to " + name + ": " + url)
@@ -168,21 +171,21 @@ object SparkEnv extends Logging {
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",
-      new BlockManagerMasterActor(isLocal)))
-    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+      new BlockManagerMasterActor(isLocal, conf)), conf)
+    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
 
     val connectionManager = blockManager.connectionManager
 
-    val broadcastManager = new BroadcastManager(isDriver)
+    val broadcastManager = new BroadcastManager(isDriver, conf)
 
     val cacheManager = new CacheManager(blockManager)
 
     // Have to assign trackerActor after initialization as MapOutputTrackerActor
     // requires the MapOutputTracker itself
     val mapOutputTracker =  if (isDriver) {
-      new MapOutputTrackerMaster()
+      new MapOutputTrackerMaster(conf)
     } else {
-      new MapOutputTracker()
+      new MapOutputTracker(conf)
     }
     mapOutputTracker.trackerActor = registerOrLookup(
       "MapOutputTracker",
@@ -193,12 +196,12 @@ object SparkEnv extends Logging {
 
     val httpFileServer = new HttpFileServer()
     httpFileServer.initialize()
-    System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+    conf.set("spark.fileserver.uri",  httpFileServer.serverUri)
 
     val metricsSystem = if (isDriver) {
-      MetricsSystem.createMetricsSystem("driver")
+      MetricsSystem.createMetricsSystem("driver", conf)
     } else {
-      MetricsSystem.createMetricsSystem("executor")
+      MetricsSystem.createMetricsSystem("executor", conf)
     }
     metricsSystem.start()
 
@@ -212,7 +215,7 @@ object SparkEnv extends Logging {
     }
 
     // Warn about deprecated spark.cache.class property
-    if (System.getProperty("spark.cache.class") != null) {
+    if (conf.getOrElse("spark.cache.class", null) != null) {
       logWarning("The spark.cache.class property is no longer being used! Specify storage " +
         "levels using the RDD.persist() method instead.")
     }
@@ -231,6 +234,7 @@ object SparkEnv extends Logging {
       connectionManager,
       httpFileServer,
       sparkFilesDir,
-      metricsSystem)
+      metricsSystem,
+      conf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index ca42c76..d6eacfe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
     accumulator: Accumulator[JList[Array[Byte]]])
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+  val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
 
   override def getPartitions = parent.partitions
 
@@ -247,10 +247,10 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
  */
 private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
   extends AccumulatorParam[JList[Array[Byte]]] {
-
+  import SparkContext.{globalConf => conf}
   Utils.checkHost(serverHost, "Expected hostname")
 
-  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+  val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
 
   override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 43c1829..be99d22 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -32,7 +32,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
 }
 
 private[spark] 
-class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
 
   private var initialized = false
   private var broadcastFactory: BroadcastFactory = null
@@ -43,14 +43,14 @@ class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable
   private def initialize() {
     synchronized {
       if (!initialized) {
-        val broadcastFactoryClass = System.getProperty(
+        val broadcastFactoryClass = conf.getOrElse(
           "spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
 
         broadcastFactory =
           Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
 
         // Initialize appropriate BroadcastFactory and BroadcastObject
-        broadcastFactory.initialize(isDriver)
+        broadcastFactory.initialize(isDriver, conf)
 
         initialized = true
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index 68bff75..fb161ce 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.broadcast
 
+import org.apache.spark.SparkConf
+
 /**
  * An interface for all the broadcast implementations in Spark (to allow 
  * multiple broadcast implementations). SparkContext uses a user-specified
@@ -24,7 +26,7 @@ package org.apache.spark.broadcast
  * entire Spark job.
  */
 private[spark] trait BroadcastFactory {
-  def initialize(isDriver: Boolean): Unit
+  def initialize(isDriver: Boolean, conf: SparkConf): Unit
   def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
   def stop(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47db720..cecb8c2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 
-import org.apache.spark.{HttpServer, Logging, SparkEnv}
+import org.apache.spark.{SparkConf, HttpServer, Logging, SparkEnv}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
@@ -64,7 +64,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
 }
 
 private[spark] class HttpBroadcastFactory extends BroadcastFactory {
-  def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
+  def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
 
   def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
     new HttpBroadcast[T](value_, isLocal, id)
@@ -88,15 +88,16 @@ private object HttpBroadcast extends Logging {
 
   private lazy val compressionCodec = CompressionCodec.createCodec()
 
-  def initialize(isDriver: Boolean) {
+  def initialize(isDriver: Boolean, conf: SparkConf) {
     synchronized {
       if (!initialized) {
-        bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-        compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
+        bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
+        compress = conf.getOrElse("spark.broadcast.compress",  "true").toBoolean
         if (isDriver) {
           createServer()
+          conf.set("spark.httpBroadcast.uri",  serverUri)
         }
-        serverUri = System.getProperty("spark.httpBroadcast.uri")
+        serverUri = conf.get("spark.httpBroadcast.uri")
         initialized = true
       }
     }
@@ -118,7 +119,6 @@ private object HttpBroadcast extends Logging {
     server = new HttpServer(broadcastDir)
     server.start()
     serverUri = server.uri
-    System.setProperty("spark.httpBroadcast.uri", serverUri)
     logInfo("Broadcast server started at " + serverUri)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 073a0a5..4a3801d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -166,8 +166,9 @@ private object TorrentBroadcast
 extends Logging {
 
   private var initialized = false
-
-  def initialize(_isDriver: Boolean) {
+  private var conf: SparkConf = null
+  def initialize(_isDriver: Boolean, conf: SparkConf) {
+    TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
     synchronized {
       if (!initialized) {
         initialized = true
@@ -179,7 +180,7 @@ extends Logging {
     initialized = false
   }
 
-  val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
+  lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize",  "4096").toInt * 1024
   
   def blockifyObject[T](obj: T): TorrentInfo = {
     val byteArray = Utils.serialize[T](obj)
@@ -238,7 +239,7 @@ private[spark] case class TorrentInfo(
 private[spark] class TorrentBroadcastFactory
   extends BroadcastFactory {
   
-  def initialize(isDriver: Boolean) { TorrentBroadcast.initialize(isDriver) }
+  def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
 
   def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
     new TorrentBroadcast[T](value_, isLocal, id)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a..dda43dc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -26,7 +26,7 @@ private[spark] class ApplicationDescription(
     val appUiUrl: String)
   extends Serializable {
 
-  val user = System.getProperty("user.name", "<unknown>")
+  val user = System.getProperty("user.name",  "<unknown>")
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fc1537f..1c979ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -67,8 +67,9 @@ class SparkHadoopUtil {
 }
 
 object SparkHadoopUtil {
+  import SparkContext.{globalConf => conf}
   private val hadoop = {
-    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+    val yarnMode = java.lang.Boolean.valueOf(conf.getOrElse("SPARK_YARN_MODE",  System.getenv("SPARK_YARN_MODE")))
     if (yarnMode) {
       try {
         Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 953755e..9bbd635 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,20 +19,18 @@ package org.apache.spark.deploy.client
 
 import java.util.concurrent.TimeoutException
 
-import scala.concurrent.duration._
 import scala.concurrent.Await
+import scala.concurrent.duration._
 
 import akka.actor._
 import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.util.AkkaUtils
 
-
 /**
  * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
  * and a listener for cluster events, and calls back the listener when various events occur.
@@ -43,7 +41,8 @@ private[spark] class Client(
     actorSystem: ActorSystem,
     masterUrls: Array[String],
     appDescription: ApplicationDescription,
-    listener: ClientListener)
+    listener: ClientListener,
+    conf: SparkConf)
   extends Logging {
 
   val REGISTRATION_TIMEOUT = 20.seconds
@@ -178,7 +177,7 @@ private[spark] class Client(
   def stop() {
     if (actor != null) {
       try {
-        val timeout = AkkaUtils.askTimeout
+        val timeout = AkkaUtils.askTimeout(conf)
         val future = actor.ask(StopClient)(timeout)
         Await.result(future, timeout)
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 5b62d3b..426cf52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.client
 
 import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
 
 private[spark] object TestClient {
@@ -45,11 +45,12 @@ private[spark] object TestClient {
 
   def main(args: Array[String]) {
     val url = args(0)
-    val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+    val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+      conf = SparkContext.globalConf)
     val desc = new ApplicationDescription(
       "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
     val listener = new TestListener
-    val client = new Client(actorSystem, Array(url), desc, listener)
+    val client = new Client(actorSystem, Array(url), desc, listener, SparkContext.globalConf)
     client.start()
     actorSystem.awaitTermination()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd079..2c162c4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 import akka.serialization.SerializationExtension
 
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkContext, Logging, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.MasterMessages._
@@ -39,13 +39,13 @@ import org.apache.spark.util.{AkkaUtils, Utils}
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
   import context.dispatcher
-
+  val conf = SparkContext.globalConf
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
-  val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
-  val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
-  val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
-  val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+  val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout",  "60").toLong * 1000
+  val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications",  "200").toInt
+  val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence",  "15").toInt
+  val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory",  "")
+  val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode",  "NONE")
 
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
@@ -63,8 +63,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   Utils.checkHost(host, "Expected hostname")
 
-  val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
-  val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+  val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+  val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
   val masterSource = new MasterSource(this)
 
   val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +86,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   // As a temporary workaround before better ways of configuring memory, we allow users to set
   // a flag that will perform round-robin scheduling across the nodes (spreading out each app
   // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
-  val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+  val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut",  "true").toBoolean
 
   override def preStart() {
     logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +103,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     persistenceEngine = RECOVERY_MODE match {
       case "ZOOKEEPER" =>
         logInfo("Persisting recovery state to ZooKeeper")
-        new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+        new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
       case "FILESYSTEM" =>
         logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
         new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +113,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
     leaderElectionAgent = RECOVERY_MODE match {
         case "ZOOKEEPER" =>
-          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+          context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
         case _ =>
           context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
       }
@@ -507,7 +507,7 @@ private[spark] object Master {
   val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
 
   def main(argStrings: Array[String]) {
-    val args = new MasterArguments(argStrings)
+    val args = new MasterArguments(argStrings, SparkContext.globalConf)
     val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
     actorSystem.awaitTermination()
   }
@@ -523,9 +523,10 @@ private[spark] object Master {
   }
 
   def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+      conf = SparkContext.globalConf)
     val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
-    val timeout = AkkaUtils.askTimeout
+    val timeout = AkkaUtils.askTimeout(SparkContext.globalConf)
     val respFuture = actor.ask(RequestWebUIPort)(timeout)
     val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
     (actorSystem, boundPort, resp.webUIBoundPort)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9d89b45..7ce83f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -18,11 +18,12 @@
 package org.apache.spark.deploy.master
 
 import org.apache.spark.util.{Utils, IntParam}
+import org.apache.spark.SparkConf
 
 /**
  * Command-line parser for the master.
  */
-private[spark] class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
   var host = Utils.localHostName()
   var port = 7077
   var webUiPort = 8080
@@ -37,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String]) {
   if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
   }
-  if (System.getProperty("master.ui.port") != null) {
-    webUiPort = System.getProperty("master.ui.port").toInt
+  if (conf.get("master.ui.port") != null) {
+    webUiPort = conf.get("master.ui.port").toInt
   }
 
   parse(args.toList)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 6cc7fd2..79d95b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.apache.zookeeper.data.Stat
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 /**
  * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -35,8 +35,9 @@ import org.apache.spark.Logging
  * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
  * times or a semantic exception is thrown (e.g., "node already exists").
  */
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
-  val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
+    conf: SparkConf) extends Logging {
+  val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url",  "")
 
   val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
   val ZK_TIMEOUT_MILLIS = 30000

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7d535b0..df5bb36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,16 +21,17 @@ import akka.actor.ActorRef
 import org.apache.zookeeper._
 import org.apache.zookeeper.Watcher.Event.EventType
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.deploy.master.MasterMessages._
 
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
+    masterUrl: String, conf: SparkConf)
   extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging  {
 
-  val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir",  "/spark") + "/leader_election"
 
   private val watcher = new ZooKeeperWatcher()
-  private val zk = new SparkZooKeeperSession(this)
+  private val zk = new SparkZooKeeperSession(this, conf)
   private var status = LeadershipStatus.NOT_LEADER
   private var myLeaderFile: String = _
   private var leaderUrl: String = _

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b..c55b720 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,19 +17,19 @@
 
 package org.apache.spark.deploy.master
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.zookeeper._
 
 import akka.serialization.Serialization
 
-class ZooKeeperPersistenceEngine(serialization: Serialization)
+class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
   extends PersistenceEngine
   with SparkZooKeeperWatcher
   with Logging
 {
-  val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+  val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir",  "/spark") + "/master_status"
 
-  val zk = new SparkZooKeeperSession(this)
+  val zk = new SparkZooKeeperSession(this, conf)
 
   zk.connect()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 9ab594b..ead3566 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark]
 class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
-  val timeout = AkkaUtils.askTimeout
+  val timeout = AkkaUtils.askTimeout(master.conf)
   val host = Utils.localHostName()
   val port = requestedPort
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6..75a6e75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,23 +25,14 @@ import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
 import akka.actor._
-import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
   * @param masterUrls Each url should look like spark://host:port.
@@ -53,7 +44,8 @@ private[spark] class Worker(
     cores: Int,
     memory: Int,
     masterUrls: Array[String],
-    workDirPath: String = null)
+    workDirPath: String = null,
+    val conf: SparkConf)
   extends Actor with Logging {
   import context.dispatcher
 
@@ -63,7 +55,7 @@ private[spark] class Worker(
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+  val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout",  "60").toLong * 1000 / 4
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3
@@ -92,7 +84,7 @@ private[spark] class Worker(
   var coresUsed = 0
   var memoryUsed = 0
 
-  val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+  val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
   val workerSource = new WorkerSource(this)
 
   def coresFree: Int = cores - coresUsed
@@ -275,6 +267,7 @@ private[spark] class Worker(
 }
 
 private[spark] object Worker {
+  import org.apache.spark.SparkContext.globalConf
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -287,9 +280,10 @@ private[spark] object Worker {
     : (ActorSystem, Int) = {
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
     val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+      conf = globalConf)
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-      masterUrls, workDir), name = "Worker")
+      masterUrls, workDir, globalConf), name = "Worker")
     (actorSystem, boundPort)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 40d6bdb..ec47ba1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,7 +22,7 @@ import java.io.File
 import javax.servlet.http.HttpServletRequest
 import org.eclipse.jetty.server.{Handler, Server}
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.ui.{JettyUtils, UIUtils}
 import org.apache.spark.ui.JettyUtils._
@@ -34,10 +34,10 @@ import org.apache.spark.util.{AkkaUtils, Utils}
 private[spark]
 class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
   extends Logging {
-  val timeout = AkkaUtils.askTimeout
+  val timeout = AkkaUtils.askTimeout(worker.conf)
   val host = Utils.localHostName()
   val port = requestedPort.getOrElse(
-    System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+    worker.conf.getOrElse("worker.ui.port",  WorkerWebUI.DEFAULT_PORT).toInt)
 
   var server: Option[Server] = None
   var boundPort: Option[Int] = None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4..c8319f6 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import akka.actor._
 import akka.remote._
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,10 +98,10 @@ private[spark] object CoarseGrainedExecutorBackend {
     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
     // before getting started with all our system properties, etc
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-      indestructible = true)
+      indestructible = true, conf = SparkContext.globalConf)
     // set it
     val sparkHostPort = hostname + ":" + boundPort
-    System.setProperty("spark.hostPort", sparkHostPort)
+//    conf.set("spark.hostPort",  sparkHostPort)
     actorSystem.actorOf(
       Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
       name = "Executor")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a..70fc30e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -57,17 +57,17 @@ private[spark] class Executor(
 
   // Make sure the local hostname we report matches the cluster scheduler's name for this host
   Utils.setCustomHostname(slaveHostname)
-
+  val conf = new SparkConf(false)
   // Set spark.* system properties from executor arg
   for ((key, value) <- properties) {
-    System.setProperty(key, value)
+    conf.set(key,  value)
   }
 
   // If we are in yarn mode, systems can have different disk layouts so we must set it
   // to what Yarn on this system said was available. This will be used later when SparkEnv
   // created.
   if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
-    System.setProperty("spark.local.dir", getYarnLocalDirs())
+    conf.set("spark.local.dir",  getYarnLocalDirs())
   }
 
   // Create our ClassLoader and set it on this thread
@@ -108,7 +108,7 @@ private[spark] class Executor(
   // Initialize Spark environment (using system properties read above)
   private val env = {
     if (!isLocal) {
-      val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0,
+      val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
         isDriver = false, isLocal = false)
       SparkEnv.set(_env)
       _env.metricsSystem.registerSource(executorSource)
@@ -303,7 +303,7 @@ private[spark] class Executor(
    * new classes defined by the REPL as the user types code
    */
   private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
-    val classUri = System.getProperty("spark.repl.class.uri")
+    val classUri = conf.getOrElse("spark.repl.class.uri", null)
     if (classUri != null) {
       logInfo("Using REPL class URI: " + classUri)
       try {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 570a979..8ef5019 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,6 +22,7 @@ import java.io.{InputStream, OutputStream}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 
 import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.apache.spark.SparkConf
 
 
 /**
@@ -37,15 +38,16 @@ trait CompressionCodec {
 
 
 private[spark] object CompressionCodec {
-
+  import org.apache.spark.SparkContext.globalConf
   def createCodec(): CompressionCodec = {
     createCodec(System.getProperty(
       "spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
   }
 
   def createCodec(codecName: String): CompressionCodec = {
-    Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
-      .newInstance().asInstanceOf[CompressionCodec]
+    val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
+      .getConstructor(classOf[SparkConf])
+      ctor.newInstance(globalConf).asInstanceOf[CompressionCodec]
   }
 }
 
@@ -53,7 +55,7 @@ private[spark] object CompressionCodec {
 /**
  * LZF implementation of [[org.apache.spark.io.CompressionCodec]].
  */
-class LZFCompressionCodec extends CompressionCodec {
+class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     new LZFOutputStream(s).setFinishBlockOnFlush(true)
@@ -67,10 +69,10 @@ class LZFCompressionCodec extends CompressionCodec {
  * Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
  * Block size can be configured by spark.io.compression.snappy.block.size.
  */
-class SnappyCompressionCodec extends CompressionCodec {
+class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+    val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size",  "32768").toInt
     new SnappyOutputStream(s, blockSize)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83..ac29816 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
 import org.apache.spark.metrics.source.Source
 
@@ -62,10 +62,11 @@ import org.apache.spark.metrics.source.Source
  *
  * [options] is the specific property of this source or sink.
  */
-private[spark] class MetricsSystem private (val instance: String) extends Logging {
+private[spark] class MetricsSystem private (val instance: String,
+    conf: SparkConf) extends Logging {
   initLogging()
 
-  val confFile = System.getProperty("spark.metrics.conf")
+  val confFile = conf.getOrElse("spark.metrics.conf", null)
   val metricsConfig = new MetricsConfig(Option(confFile))
 
   val sinks = new mutable.ArrayBuffer[Sink]
@@ -159,5 +160,6 @@ private[spark] object MetricsSystem {
     }
   }
 
-  def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+  def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem =
+    new MetricsSystem(instance, conf)
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 703bc6a..3e902f8 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
 
 import org.apache.spark.util.Utils
 
-private[spark] class ConnectionManager(port: Int) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
 
   class MessageStatus(
       val message: Message,
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
   private val selector = SelectorProvider.provider.openSelector()
 
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    System.getProperty("spark.core.connection.handler.threads.min","20").toInt,
-    System.getProperty("spark.core.connection.handler.threads.max","60").toInt,
-    System.getProperty("spark.core.connection.handler.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+    conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt,
+    conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt,
+    conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    System.getProperty("spark.core.connection.io.threads.min","4").toInt,
-    System.getProperty("spark.core.connection.io.threads.max","32").toInt,
-    System.getProperty("spark.core.connection.io.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+    conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt,
+    conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt,
+    conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    System.getProperty("spark.core.connection.connect.threads.min","1").toInt,
-    System.getProperty("spark.core.connection.connect.threads.max","8").toInt,
-    System.getProperty("spark.core.connection.connect.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+    conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt,
+    conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt,
+    conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()
@@ -593,8 +593,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
 
 private[spark] object ConnectionManager {
 
+  import SparkContext.globalConf
+
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999)
+    val manager = new ConnectionManager(9999, globalConf)
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       println("Received [" + msg + "] from [" + id + "]")
       None

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 7817151..4ca3cd3 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer
 import java.net.InetAddress
 
 private[spark] object ReceiverTest {
-
+  import org.apache.spark.SparkContext.globalConf
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999)
+    val manager = new ConnectionManager(9999, globalConf)
     println("Started connection manager with id = " + manager.id)
     
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 7775749..11c21fc 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 import java.net.InetAddress
 
 private[spark] object SenderTest {
-
+  import org.apache.spark.SparkContext.globalConf
   def main(args: Array[String]) {
     
     if (args.length < 2) {
@@ -33,7 +33,7 @@ private[spark] object SenderTest {
     val targetPort = args(1).toInt
     val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
 
-    val manager = new ConnectionManager(0)
+    val manager = new ConnectionManager(0, globalConf)
     println("Started connection manager with id = " + manager.id)
 
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b1e1576..81b3104 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -23,20 +23,20 @@ import io.netty.buffer.ByteBuf
 import io.netty.channel.ChannelHandlerContext
 import io.netty.util.CharsetUtil
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
 import org.apache.spark.network.ConnectionManagerId
 
 import scala.collection.JavaConverters._
 import org.apache.spark.storage.BlockId
 
 
-private[spark] class ShuffleCopier extends Logging {
+private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
 
   def getBlock(host: String, port: Int, blockId: BlockId,
       resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+    val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout",  "60000").toInt
     val fc = new FileClient(handler, connectTimeout)
 
     try {
@@ -107,7 +107,7 @@ private[spark] object ShuffleCopier extends Logging {
     val tasks = (for (i <- Range(0, threads)) yield { 
       Executors.callable(new Runnable() {
         def run() {
-          val copier = new ShuffleCopier()
+          val copier = new ShuffleCopier(SparkContext.globalConf)
           copier.getBlock(host, port, blockId, echoResultCollectCallBack)
         }
       })

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a712ef1..9fbe002 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -75,6 +75,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
 
 private[spark] object CheckpointRDD extends Logging {
 
+  import SparkContext.{globalConf => conf}
+
   def splitIdToFile(splitId: Int): String = {
     "part-%05d".format(splitId)
   }
@@ -92,7 +94,7 @@ private[spark] object CheckpointRDD extends Logging {
       throw new IOException("Checkpoint failed: temporary path " +
         tempOutputPath + " already exists")
     }
-    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
 
     val fileOutputStream = if (blockSize < 0) {
       fs.create(tempOutputPath, false, bufferSize)
@@ -122,7 +124,7 @@ private[spark] object CheckpointRDD extends Logging {
   def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
-    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+    val bufferSize = conf.getOrElse("spark.buffer.size",  "65536").toInt
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
     val deserializeStream = serializer.deserializeStream(fileInputStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566..f8b1a69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -81,6 +81,7 @@ abstract class RDD[T: ClassTag](
   def this(@transient oneParent: RDD[_]) =
     this(oneParent.context , List(new OneToOneDependency(oneParent)))
 
+  private[spark] def conf = sc.conf
   // =======================================================================
   // Methods that should be implemented by subclasses of RDD
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 6092783..3f55cd5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
 class JobLogger(val user: String, val logDirName: String)
   extends SparkListener with Logging {
 
-  def this() = this(System.getProperty("user.name", "<unknown>"),
+  def this() = this(System.getProperty("user.name",  "<unknown>"),
     String.valueOf(System.currentTimeMillis()))
 
   private val logDir =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 356fe56..9002d33 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
 import java.io.{FileInputStream, InputStream}
 import java.util.{NoSuchElementException, Properties}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 
 import scala.xml.XML
 
@@ -49,10 +49,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
   }
 }
 
-private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+  val schedulerAllocFile = Option(conf.get("spark.scheduler.allocation.file"))
   val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
   val DEFAULT_POOL_NAME = "default"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 66ab8ea..7e231ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -49,11 +49,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   extends TaskScheduler
   with Logging
 {
+  val conf = sc.conf
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+  val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval",  "100").toLong
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+  val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout",  "15000").toLong
 
   // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
@@ -90,7 +91,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   var rootPool: Pool = null
   // default scheduler is FIFO
   val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    System.getProperty("spark.scheduler.mode", "FIFO"))
+    conf.getOrElse("spark.scheduler.mode",  "FIFO"))
 
   // This is a var so that we can reset it for testing purposes.
   private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -108,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
         case SchedulingMode.FIFO =>
           new FIFOSchedulableBuilder(rootPool)
         case SchedulingMode.FAIR =>
-          new FairSchedulableBuilder(rootPool)
+          new FairSchedulableBuilder(rootPool, conf)
       }
     }
     schedulableBuilder.buildPools()
@@ -119,7 +120,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
   override def start() {
     backend.start()
 
-    if (System.getProperty("spark.speculation", "false").toBoolean) {
+    if (conf.getOrElse("spark.speculation",  "false").toBoolean) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index bf494aa..398b0ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -50,15 +50,16 @@ private[spark] class ClusterTaskSetManager(
   extends TaskSetManager
   with Logging
 {
+  val conf = sched.sc.conf
   // CPUs to request per task
-  val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+  val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus",  "1").toInt
 
   // Maximum times a task is allowed to fail before failing the job
-  val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
+  val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures",  "4").toInt
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile",  "0.75").toDouble
+  val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier",  "1.5").toDouble
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -117,7 +118,7 @@ private[spark] class ClusterTaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+    conf.getOrElse("spark.logging.exceptionPrintInterval",  "10000").toLong
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
@@ -677,14 +678,14 @@ private[spark] class ClusterTaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = System.getProperty("spark.locality.wait", "3000")
+    val defaultWait = conf.getOrElse("spark.locality.wait",  "3000")
     level match {
       case TaskLocality.PROCESS_LOCAL =>
-        System.getProperty("spark.locality.wait.process", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.process",  defaultWait).toLong
       case TaskLocality.NODE_LOCAL =>
-        System.getProperty("spark.locality.wait.node", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.node",  defaultWait).toLong
       case TaskLocality.RACK_LOCAL =>
-        System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+        conf.getOrElse("spark.locality.wait.rack",  defaultWait).toLong
       case TaskLocality.ANY =>
         0L
     }