You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:43 UTC

[05/33] Various fixes to configuration code

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 398b0ce..a46b16b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -52,14 +52,14 @@ private[spark] class ClusterTaskSetManager(
 {
   val conf = sched.sc.conf
   // CPUs to request per task
-  val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus",  "1").toInt
+  val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
 
   // Maximum times a task is allowed to fail before failing the job
-  val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures",  "4").toInt
+  val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures", "4").toInt
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile",  "0.75").toDouble
-  val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier",  "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+  val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -118,7 +118,7 @@ private[spark] class ClusterTaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    conf.getOrElse("spark.logging.exceptionPrintInterval",  "10000").toLong
+    conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception
@@ -678,7 +678,7 @@ private[spark] class ClusterTaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = conf.getOrElse("spark.locality.wait",  "3000")
+    val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
     level match {
       case TaskLocality.PROCESS_LOCAL =>
         conf.getOrElse("spark.locality.wait.process",  defaultWait).toLong

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4055590..156b01b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -62,7 +62,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
       // Periodically revive offers to allow delay scheduling to work
-      val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval",  "1000").toLong
+      val reviveInterval = conf.getOrElse("spark.scheduler.revive.interval", "1000").toLong
       import context.dispatcher
       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
     }
@@ -118,7 +118,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
         removeExecutor(executorId, reason)
         sender ! true
 
-      case DisassociatedEvent(_, address, _) => 
+      case DisassociatedEvent(_, address, _) =>
         addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
 
     }
@@ -163,10 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
 
   override def start() {
     val properties = new ArrayBuffer[(String, String)]
-    val iterator = scheduler.sc.conf.getAllConfiguration
-    while (iterator.hasNext) {
-      val entry = iterator.next
-      val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+    for ((key, value) <- scheduler.sc.conf.getAll) {
       if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
         properties += ((key, value))
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
index 5367218..65d3fc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
@@ -31,7 +31,4 @@ private[spark] trait SchedulerBackend {
   def defaultParallelism(): Int
 
   def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
-
-  // Memory used by each executor (in megabytes)
-  protected val executorMemory: Int = SparkContext.executorMemoryRequested
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index d01329b..d74f000 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -31,7 +31,7 @@ private[spark] class SimrSchedulerBackend(
   val tmpPath = new Path(driverFilePath + "_tmp")
   val filePath = new Path(driverFilePath)
 
-  val maxCores = conf.getOrElse("spark.simr.executor.cores",  "1").toInt
+  val maxCores = conf.getOrElse("spark.simr.executor.cores", "1").toInt
 
   override def start() {
     super.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d6b8ac2..de69e32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome().getOrElse(null)
-    val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
+    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
         "http://" + sc.ui.appUIAddress)
 
     client = new Client(sc.env.actorSystem, masters, appDesc, this, conf)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
index ff6cc37..319c91b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
 private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
   extends Logging {
 
-  private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads",  "4").toInt
+  private val THREADS = sparkEnv.conf.getOrElse("spark.resultGetter.threads", "4").toInt
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
     THREADS, "Result resolver thread")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2a3b0e1..1695374 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -76,7 +76,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     "Spark home is not set; set it through the spark.home system " +
     "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 
-  val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores",  "0").toInt
+  val extraCoresPerSlave = conf.getOrElse("spark.mesos.extra.cores", "0").toInt
 
   var nextMesosTaskId = 0
 
@@ -176,7 +176,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         val slaveId = offer.getSlaveId.toString
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+        if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
             failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
             !slaveIdsWithExecutors.contains(slaveId)) {
           // Launch an executor on the slave
@@ -192,7 +192,7 @@ private[spark] class CoarseMesosSchedulerBackend(
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", executorMemory))
+            .addResources(createResource("mem", sc.executorMemory))
             .build()
           d.launchTasks(offer.getId, Collections.singletonList(task), filters)
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 9bb92b4..8dfd4d5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -114,7 +114,7 @@ private[spark] class MesosSchedulerBackend(
     val memory = Resource.newBuilder()
       .setName("mem")
       .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
+      .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
       .build()
     ExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -199,7 +199,7 @@ private[spark] class MesosSchedulerBackend(
         def enoughMemory(o: Offer) = {
           val mem = getResource(o.getResourcesList, "mem")
           val slaveId = o.getSlaveId.getValue
-          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+          mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
         }
 
         for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
@@ -341,5 +341,5 @@ private[spark] class MesosSchedulerBackend(
   }
 
   // TODO: query Mesos for number of cores
-  override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism",  "8").toInt
+  override def defaultParallelism() = sc.conf.getOrElse("spark.default.parallelism", "8").toInt
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 6069c1d..8498cff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -92,7 +92,7 @@ private[spark] class LocalScheduler(val threads: Int, val maxFailures: Int, val
   var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
   val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    conf.getOrElse("spark.scheduler.mode",  "FIFO"))
+    conf.getOrElse("spark.scheduler.mode", "FIFO"))
   val activeTaskSets = new HashMap[String, LocalTaskSetManager]
   val taskIdToTaskSetId = new HashMap[Long, String]
   val taskSetTaskIds = new HashMap[String, HashSet[Long]]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 4de8161..5d3d436 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.nio.ByteBuffer
 
 import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.SparkConf
 
 private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
   val objOut = new ObjectOutputStream(out)
@@ -77,6 +78,6 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
 /**
  * A Spark serializer that uses Java's built-in serialization.
  */
-class JavaSerializer extends Serializer {
+class JavaSerializer(conf: SparkConf) extends Serializer {
   def newInstance(): SerializerInstance = new JavaSerializerInstance
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 17cec81..2367f3f 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,20 +25,21 @@ import com.esotericsoftware.kryo.{KryoException, Kryo}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
 import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
 
-import org.apache.spark.{SparkContext, SparkConf, SerializableWritable, Logging}
+import org.apache.spark._
 import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage._
 import scala.util.Try
+import org.apache.spark.storage.PutBlock
+import org.apache.spark.storage.GetBlock
+import org.apache.spark.storage.GotBlock
 
 /**
  * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
  */
-class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
-
-  private val conf = SparkContext.globalConf
+class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
   private val bufferSize = {
-    conf.getOrElse("spark.kryoserializer.buffer.mb",  "2").toInt * 1024 * 1024
+    conf.getOrElse("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
   }
 
   def newKryoOutput() = new KryoOutput(bufferSize)
@@ -50,7 +51,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
     // Do this before we invoke the user registrator so the user registrator can override this.
-    kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking",  "true").toBoolean)
+    kryo.setReferences(conf.getOrElse("spark.kryo.referenceTracking", "true").toBoolean)
 
     for (cls <- KryoSerializer.toRegister) kryo.register(cls)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 2955986..2246527 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.serializer
 
 import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.SparkConf
 
 
 /**
@@ -32,12 +33,12 @@ private[spark] class SerializerManager {
 
   def default = _default
 
-  def setDefault(clsName: String): Serializer = {
-    _default = get(clsName)
+  def setDefault(clsName: String, conf: SparkConf): Serializer = {
+    _default = get(clsName, conf)
     _default
   }
 
-  def get(clsName: String): Serializer = {
+  def get(clsName: String, conf: SparkConf): Serializer = {
     if (clsName == null) {
       default
     } else {
@@ -51,8 +52,9 @@ private[spark] class SerializerManager {
         serializer = serializers.get(clsName)
         if (serializer == null) {
           val clsLoader = Thread.currentThread.getContextClassLoader
-          serializer =
-            Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
+          val cls = Class.forName(clsName, true, clsLoader)
+          val constructor = cls.getConstructor(classOf[SparkConf])
+          serializer = constructor.newInstance(conf).asInstanceOf[Serializer]
           serializers.put(clsName, serializer)
         }
         serializer

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ee2ae47..3b25f68 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
         fetchRequestsSync.put(request)
       }
 
-      copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads",  "6").toInt)
+      copiers = startCopiers(conf.getOrElse("spark.shuffle.copier.threads", "6").toInt)
       logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
         Utils.getUsedTimeMs(startTime))
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ffd166e..16ee208 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
 
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
   private val nettyPort: Int = {
-    val useNetty = conf.getOrElse("spark.shuffle.use.netty",  "false").toBoolean
-    val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port",  "0").toInt
+    val useNetty = conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean
+    val nettyPortConfig = conf.getOrElse("spark.shuffle.sender.port", "0").toInt
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
@@ -72,18 +72,18 @@ private[spark] class BlockManager(
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
   val maxBytesInFlight =
-    conf.getOrElse("spark.reducer.maxMbInFlight",  "48").toLong * 1024 * 1024
+    conf.getOrElse("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
 
   // Whether to compress broadcast variables that are stored
-  val compressBroadcast = conf.getOrElse("spark.broadcast.compress",  "true").toBoolean
+  val compressBroadcast = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
   // Whether to compress shuffle output that are stored
-  val compressShuffle = conf.getOrElse("spark.shuffle.compress",  "true").toBoolean
+  val compressShuffle = conf.getOrElse("spark.shuffle.compress", "true").toBoolean
   // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = conf.getOrElse("spark.rdd.compress",  "false").toBoolean
+  val compressRdds = conf.getOrElse("spark.rdd.compress", "false").toBoolean
 
-  val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+  val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
 
-  val hostPort = Utils.localHostPort()
+  val hostPort = Utils.localHostPort(conf)
 
   val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
     name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
@@ -101,8 +101,11 @@ private[spark] class BlockManager(
 
   var heartBeatTask: Cancellable = null
 
-  private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks)
-  private val broadcastCleaner = new MetadataCleaner(MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks)
+  private val metadataCleaner = new MetadataCleaner(
+    MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
+  private val broadcastCleaner = new MetadataCleaner(
+    MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
+
   initialize()
 
   // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -110,14 +113,14 @@ private[spark] class BlockManager(
   // program could be using a user-defined codec in a third party jar, which is loaded in
   // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
   // loaded yet.
-  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
 
   /**
    * Construct a BlockManager with a memory limit set based on system properties.
    */
   def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
            serializer: Serializer, conf: SparkConf) = {
-    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties, conf)
+    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf)
   }
 
   /**
@@ -127,7 +130,7 @@ private[spark] class BlockManager(
   private def initialize() {
     master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
     BlockManagerWorker.startBlockManagerWorker(this)
-    if (!BlockManager.getDisableHeartBeatsForTesting) {
+    if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
       heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
         heartBeat()
       }
@@ -440,7 +443,7 @@ private[spark] class BlockManager(
       : BlockFetcherIterator = {
 
     val iter =
-      if (conf.getOrElse("spark.shuffle.use.netty",  "false").toBoolean) {
+      if (conf.getOrElse("spark.shuffle.use.netty", "false").toBoolean) {
         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
       } else {
         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -466,7 +469,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val syncWrites = conf.getOrElse("spark.shuffle.sync",  "false").toBoolean
+    val syncWrites = conf.getOrElse("spark.shuffle.sync", "false").toBoolean
     new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
   }
 
@@ -858,19 +861,18 @@ private[spark] class BlockManager(
 
 
 private[spark] object BlockManager extends Logging {
-  import org.apache.spark.SparkContext.{globalConf => conf}
   val ID_GENERATOR = new IdGenerator
 
-  def getMaxMemoryFromSystemProperties: Long = {
-    val memoryFraction = conf.getOrElse("spark.storage.memoryFraction",  "0.66").toDouble
+  def getMaxMemory(conf: SparkConf): Long = {
+    val memoryFraction = conf.getOrElse("spark.storage.memoryFraction", "0.66").toDouble
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 
-  def getHeartBeatFrequencyFromSystemProperties: Long =
-    conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",  "60000").toLong / 4
+  def getHeartBeatFrequency(conf: SparkConf): Long =
+    conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
 
-  def getDisableHeartBeatsForTesting: Boolean =
-    conf.getOrElse("spark.test.disableBlockManagerHeartBeat",  "false").toBoolean
+  def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
+    conf.getOrElse("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
 
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index fde7d63..8e4a88b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -31,8 +31,8 @@ private[spark]
 class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
     conf: SparkConf) extends Logging {
 
-  val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries",  "3").toInt
-  val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait",  "3000").toInt
+  val AKKA_RETRY_ATTEMPTS: Int = conf.getOrElse("spark.akka.num.retries", "3").toInt
+  val AKKA_RETRY_INTERVAL_MS: Int = conf.getOrElse("spark.akka.retry.wait", "3000").toInt
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 05502e4..73a1da2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -53,7 +53,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
   initLogging()
 
   val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
-    "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+    "" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
 
   val checkTimeoutInterval = conf.getOrElse("spark.storage.blockManagerTimeoutIntervalMs",
     "60000").toLong
@@ -61,7 +61,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
   var timeoutCheckingTask: Cancellable = null
 
   override def preStart() {
-    if (!BlockManager.getDisableHeartBeatsForTesting) {
+    if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
       import context.dispatcher
       timeoutCheckingTask = context.system.scheduler.schedule(
         0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 8f528ba..7697092 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-  private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories",  "64").toInt
+  private val subDirsPerLocalDir = shuffleManager.conf.getOrElse("spark.diskStore.subDirectories", "64").toInt
 
   // Create one local directory for each path mentioned in spark.local.dir; then, inside this
   // directory, create multiple subdirectories that we will hash files into, in order to avoid

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 850d317..f592df2 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -62,12 +62,13 @@ private[spark] trait ShuffleWriterGroup {
 private[spark]
 class ShuffleBlockManager(blockManager: BlockManager) {
   def conf = blockManager.conf
+
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    conf.getOrElse("spark.shuffle.consolidateFiles",  "false").toBoolean
+    conf.getOrElse("spark.shuffle.consolidateFiles", "false").toBoolean
 
-  private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb",  "100").toInt * 1024
+  private val bufferSize = conf.getOrElse("spark.shuffle.file.buffer.kb", "100").toInt * 1024
 
   /**
    * Contains all the state related to a particular shuffle. This includes a pool of unused
@@ -82,8 +83,8 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   type ShuffleId = Int
   private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]
 
-  private
-  val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup)
+  private val metadataCleaner =
+    new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
 
   def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
     new ShuffleWriterGroup {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
index d52b3d8..40734aa 100644
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
@@ -56,7 +56,7 @@ object StoragePerfTester {
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {
       val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
-        new KryoSerializer())
+        new KryoSerializer(sc.conf))
       val writers = shuffle.writers
       for (i <- 1 to recordsPerMap) {
         writers(i % numOutputSplits).write(writeData)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index b3b3893..dca98c6 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -22,7 +22,7 @@ import akka.actor._
 import java.util.concurrent.ArrayBlockingQueue
 import util.Random
 import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 
 /**
  * This class tests the BlockManager and MemoryStore for thread safety and
@@ -92,8 +92,8 @@ private[spark] object ThreadingTest {
   def main(args: Array[String]) {
     System.setProperty("spark.kryoserializer.buffer.mb", "1")
     val actorSystem = ActorSystem("test")
-    val conf = SparkContext.globalConf
-    val serializer = new KryoSerializer
+    val conf = new SparkConf()
+    val serializer = new KryoSerializer(conf)
     val blockManagerMaster = new BlockManagerMaster(
       Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
     val blockManager = new BlockManager(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 14751e8..58d47a2 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui
 
 import scala.util.Random
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.SparkContext._
 import org.apache.spark.scheduler.SchedulingMode
 
@@ -31,7 +31,6 @@ import org.apache.spark.scheduler.SchedulingMode
  */
 private[spark] object UIWorkloadGenerator {
 
-  import SparkContext.{globalConf => conf}
   val NUM_PARTITIONS = 100
   val INTER_JOB_WAIT_MS = 5000
 
@@ -40,14 +39,14 @@ private[spark] object UIWorkloadGenerator {
       println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
       System.exit(1)
     }
-    val master = args(0)
-    val schedulingMode = SchedulingMode.withName(args(1))
-    val appName = "Spark UI Tester"
 
+    val conf = new SparkConf().setMaster(args(0)).setAppName("Spark UI tester")
+
+    val schedulingMode = SchedulingMode.withName(args(1))
     if (schedulingMode == SchedulingMode.FAIR) {
-      conf.set("spark.scheduler.mode",  "FAIR")
+      conf.set("spark.scheduler.mode", "FAIR")
     }
-    val sc = new SparkContext(master, appName)
+    val sc = new SparkContext(conf)
 
     def setProperties(s: String) = {
       if(schedulingMode == SchedulingMode.FAIR) {
@@ -57,11 +56,11 @@ private[spark] object UIWorkloadGenerator {
     }
 
     val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
-    def nextFloat() = (new Random()).nextFloat()
+    def nextFloat() = new Random().nextFloat()
 
     val jobs = Seq[(String, () => Long)](
       ("Count", baseData.count),
-      ("Cache and Count", baseData.map(x => x).cache.count),
+      ("Cache and Count", baseData.map(x => x).cache().count),
       ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
       ("Entirely failed phase", baseData.map(x => throw new Exception).count),
       ("Partially failed phase", {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index b637d37..91fa00a 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -63,7 +63,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
       UIUtils.listingTable(propertyHeaders, propertyRow, otherProperties, fixedWidth = true)
 
     val classPathEntries = classPathProperty._2
-        .split(sc.conf.getOrElse("path.separator",  ":"))
+        .split(sc.conf.getOrElse("path.separator", ":"))
         .filterNot(e => e.isEmpty)
         .map(e => (e, "System Classpath"))
     val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index f01a138..6ff8e9f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
  */
 private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
   // How many stages to remember
-  val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages",  "1000").toInt
+  val RETAINED_STAGES = sc.conf.getOrElse("spark.ui.retained_stages", "1000").toInt
   val DEFAULT_POOL_NAME = "default"
 
   val stageIdToPool = new HashMap[Int, String]()
@@ -105,7 +105,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
     val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]())
     stages += stage
   }
-  
+
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
     val sid = taskStart.task.stageId
     val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 76febd5..58b26f7 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -41,19 +41,19 @@ private[spark] object AkkaUtils {
   def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
     conf: SparkConf): (ActorSystem, Int) = {
 
-    val akkaThreads   = conf.getOrElse("spark.akka.threads",  "4").toInt
-    val akkaBatchSize = conf.getOrElse("spark.akka.batchSize",  "15").toInt
+    val akkaThreads   = conf.getOrElse("spark.akka.threads", "4").toInt
+    val akkaBatchSize = conf.getOrElse("spark.akka.batchSize", "15").toInt
 
-    val akkaTimeout = conf.getOrElse("spark.akka.timeout",  "100").toInt
+    val akkaTimeout = conf.getOrElse("spark.akka.timeout", "100").toInt
 
-    val akkaFrameSize = conf.getOrElse("spark.akka.frameSize",  "10").toInt
+    val akkaFrameSize = conf.getOrElse("spark.akka.frameSize", "10").toInt
     val lifecycleEvents =
-      if (conf.getOrElse("spark.akka.logLifecycleEvents",  "false").toBoolean) "on" else "off"
+      if (conf.getOrElse("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
 
-    val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses",  "600").toInt
+    val akkaHeartBeatPauses = conf.getOrElse("spark.akka.heartbeat.pauses", "600").toInt
     val akkaFailureDetector =
-      conf.getOrElse("spark.akka.failure-detector.threshold",  "300.0").toDouble
-    val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval",  "1000").toInt
+      conf.getOrElse("spark.akka.failure-detector.threshold", "300.0").toDouble
+    val akkaHeartBeatInterval = conf.getOrElse("spark.akka.heartbeat.interval", "1000").toInt
 
     val akkaConf = ConfigFactory.parseString(
       s"""
@@ -89,6 +89,6 @@ private[spark] object AkkaUtils {
 
   /** Returns the default Spark timeout to use for Akka ask operations. */
   def askTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.getOrElse("spark.akka.askTimeout",  "30").toLong, "seconds")
+    Duration.create(conf.getOrElse("spark.akka.askTimeout", "30").toLong, "seconds")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index bf71d17..431d888 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,16 +18,21 @@
 package org.apache.spark.util
 
 import java.util.{TimerTask, Timer}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
 
 
 /**
  * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
  */
-class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, cleanupFunc: (Long) => Unit) extends Logging {
+class MetadataCleaner(
+    cleanerType: MetadataCleanerType.MetadataCleanerType,
+    cleanupFunc: (Long) => Unit,
+    conf: SparkConf)
+  extends Logging
+{
   val name = cleanerType.toString
 
-  private val delaySeconds = MetadataCleaner.getDelaySeconds
+  private val delaySeconds = MetadataCleaner.getDelaySeconds(conf)
   private val periodSeconds = math.max(10, delaySeconds / 10)
   private val timer = new Timer(name + " cleanup timer", true)
 
@@ -65,22 +70,28 @@ object MetadataCleanerType extends Enumeration {
   def systemProperty(which: MetadataCleanerType.MetadataCleanerType) = "spark.cleaner.ttl." + which.toString
 }
 
+// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
+// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
 object MetadataCleaner {
-  private val conf = SparkContext.globalConf
-  // using only sys props for now : so that workers can also get to it while preserving earlier behavior.
-  def getDelaySeconds = conf.getOrElse("spark.cleaner.ttl",  "3500").toInt //TODO: this is to fix tests for time being
+  def getDelaySeconds(conf: SparkConf) = {
+    conf.getOrElse("spark.cleaner.ttl", "3500").toInt
+  }
 
-  def getDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType): Int = {
-    conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType),  getDelaySeconds.toString).toInt
+  def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =
+  {
+    conf.getOrElse(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString)
+      .toInt
   }
 
-  def setDelaySeconds(cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) {
+  def setDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType,
+      delay: Int)
+  {
     conf.set(MetadataCleanerType.systemProperty(cleanerType),  delay.toString)
   }
 
-  def setDelaySeconds(delay: Int, resetAll: Boolean = true) {
+  def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) {
     // override for all ?
-    conf.set("spark.cleaner.ttl",  delay.toString)
+    conf.set("spark.cleaner.ttl", delay.toString)
     if (resetAll) {
       for (cleanerType <- MetadataCleanerType.values) {
         System.clearProperty(MetadataCleanerType.systemProperty(cleanerType))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 1407c39..bddb3bb 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -30,10 +30,10 @@ import java.lang.management.ManagementFactory
 import scala.collection.mutable.ArrayBuffer
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkEnv, SparkConf, SparkContext, Logging}
 
 /**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in 
+ * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
  * memory-aware caches.
  *
  * Based on the following JavaWorld article:
@@ -41,7 +41,6 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
  */
 private[spark] object SizeEstimator extends Logging {
 
-  private def conf = SparkContext.globalConf
   // Sizes of primitive types
   private val BYTE_SIZE    = 1
   private val BOOLEAN_SIZE = 1
@@ -90,9 +89,11 @@ private[spark] object SizeEstimator extends Logging {
     classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
   }
 
-  private def getIsCompressedOops : Boolean = {
-    if (conf.getOrElse("spark.test.useCompressedOops", null) != null) {
-      return conf.get("spark.test.useCompressedOops").toBoolean 
+  private def getIsCompressedOops: Boolean = {
+    // This is only used by tests to override the detection of compressed oops. The test
+    // actually uses a system property instead of a SparkConf, so we'll stick with that.
+    if (System.getProperty("spark.test.useCompressedOops") != null) {
+      return System.getProperty("spark.test.useCompressedOops").toBoolean
     }
 
     try {
@@ -104,7 +105,7 @@ private[spark] object SizeEstimator extends Logging {
       val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
           Class.forName("java.lang.String"))
 
-      val bean = ManagementFactory.newPlatformMXBeanProxy(server, 
+      val bean = ManagementFactory.newPlatformMXBeanProxy(server,
         hotSpotMBeanName, hotSpotMBeanClass)
       // TODO: We could use reflection on the VMOption returned ?
       return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
@@ -252,7 +253,7 @@ private[spark] object SizeEstimator extends Logging {
     if (info != null) {
       return info
     }
-    
+
     val parent = getClassInfo(cls.getSuperclass)
     var shellSize = parent.shellSize
     var pointerFields = parent.pointerFields

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fd5888e..b6b89cc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -36,15 +36,13 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import org.apache.spark.deploy.SparkHadoopUtil
 import java.nio.ByteBuffer
-import org.apache.spark.{SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
 
 
 /**
  * Various utility methods used by Spark.
  */
 private[spark] object Utils extends Logging {
-
-  private lazy val conf = SparkContext.globalConf
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -240,9 +238,9 @@ private[spark] object Utils extends Logging {
    * Throws SparkException if the target file already exists and has different contents than
    * the requested file.
    */
-  def fetchFile(url: String, targetDir: File) {
+  def fetchFile(url: String, targetDir: File, conf: SparkConf) {
     val filename = url.split("/").last
-    val tempDir = getLocalDir
+    val tempDir = getLocalDir(conf)
     val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
     val targetFile = new File(targetDir, filename)
     val uri = new URI(url)
@@ -312,7 +310,7 @@ private[spark] object Utils extends Logging {
    * return a single directory, even though the spark.local.dir property might be a list of
    * multiple paths.
    */
-  def getLocalDir: String = {
+  def getLocalDir(conf: SparkConf): String = {
     conf.getOrElse("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)
   }
 
@@ -398,7 +396,7 @@ private[spark] object Utils extends Logging {
     InetAddress.getByName(address).getHostName
   }
 
-  def localHostPort(): String = {
+  def localHostPort(conf: SparkConf): String = {
     val retval = conf.getOrElse("spark.hostPort",  null)
     if (retval == null) {
       logErrorWithStack("spark.hostPort not set but invoking localHostPort")
@@ -838,7 +836,7 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  /** 
+  /**
    * Timing method based on iterations that permit JVM JIT optimization.
    * @param numIters number of iterations
    * @param f function to be executed

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ab81bfb..8d75460 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.io
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 
 import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
 
 
 class CompressionCodecSuite extends FunSuite {
+  val conf = new SparkConf(false)
 
   def testCodec(codec: CompressionCodec) {
     // Write 1000 integers to the output stream, compressed.
@@ -43,19 +45,19 @@ class CompressionCodecSuite extends FunSuite {
   }
 
   test("default compression codec") {
-    val codec = CompressionCodec.createCodec()
+    val codec = CompressionCodec.createCodec(conf)
     assert(codec.getClass === classOf[LZFCompressionCodec])
     testCodec(codec)
   }
 
   test("lzf compression codec") {
-    val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+    val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
     assert(codec.getClass === classOf[LZFCompressionCodec])
     testCodec(codec)
   }
 
   test("snappy compression codec") {
-    val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+    val codec = CompressionCodec.createCodec(conf, classOf[SnappyCompressionCodec].getName)
     assert(codec.getClass === classOf[SnappyCompressionCodec])
     testCodec(codec)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 2bb827c..3711382 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -82,7 +82,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
 class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
   import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
   private val conf = new SparkConf
-  val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait",  "3000").toLong
+  val LOCALITY_WAIT = conf.getOrElse("spark.locality.wait", "3000").toLong
 
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index c016c51..33b0148 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -22,12 +22,14 @@ import scala.collection.mutable
 import com.esotericsoftware.kryo.Kryo
 
 import org.scalatest.FunSuite
-import org.apache.spark.SharedSparkContext
+import org.apache.spark.{SparkConf, SharedSparkContext}
 import org.apache.spark.serializer.KryoTest._
 
 class KryoSerializerSuite extends FunSuite with SharedSparkContext {
+  val conf = new SparkConf(false)
+
   test("basic types") {
-    val ser = (new KryoSerializer).newInstance()
+    val ser = new KryoSerializer(conf).newInstance()
     def check[T](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
     }
@@ -57,7 +59,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   }
 
   test("pairs") {
-    val ser = (new KryoSerializer).newInstance()
+    val ser = new KryoSerializer(conf).newInstance()
     def check[T](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
     }
@@ -81,7 +83,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   }
 
   test("Scala data structures") {
-    val ser = (new KryoSerializer).newInstance()
+    val ser = new KryoSerializer(conf).newInstance()
     def check[T](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
     }
@@ -104,7 +106,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   }
 
   test("ranges") {
-    val ser = (new KryoSerializer).newInstance()
+    val ser = new KryoSerializer(conf).newInstance()
     def check[T](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
       // Check that very long ranges don't get written one element at a time
@@ -127,7 +129,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   test("custom registrator") {
     System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
 
-    val ser = (new KryoSerializer).newInstance()
+    val ser = new KryoSerializer(conf).newInstance()
     def check[T](t: T) {
       assert(ser.deserialize[T](ser.serialize(t)) === t)
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 4ef5538..a0fc344 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.{SparkConf, SparkContext}
 
 class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
-  private val conf = new SparkConf
+  private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
   var actorSystem: ActorSystem = null
@@ -45,7 +45,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
   conf.set("spark.kryoserializer.buffer.mb", "1")
-  val serializer = new KryoSerializer
+  val serializer = new KryoSerializer(conf)
 
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -167,7 +167,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
 
   test("master + 2 managers interaction") {
     store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf)
-    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000, conf)
+    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf)
 
     val peers = master.getPeers(store.blockManagerId, 1)
     assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -654,7 +654,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
 
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
-    store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200, conf)
+    store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf)
 
     // The put should fail since a1 is not serializable.
     class UnserializableClass

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index a5facd5..11ebdc3 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -140,8 +140,6 @@ class SizeEstimatorSuite
   test("64-bit arch with no compressed oops") {
     val arch = System.setProperty("os.arch", "amd64")
     val oops = System.setProperty("spark.test.useCompressedOops", "false")
-    SparkContext.globalConf.set("os.arch", "amd64")
-    SparkContext.globalConf.set("spark.test.useCompressedOops", "false")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 12c430b..4c0de46 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -37,7 +37,7 @@ object WikipediaPageRank {
       System.exit(-1)
     }
     val sparkConf = new SparkConf()
-    sparkConf.set("spark.serializer",  "org.apache.spark.serializer.KryoSerializer")
+    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     sparkConf.set("spark.kryo.registrator",  classOf[PRKryoRegistrator].getName)
 
     val inputFile = args(0)
@@ -46,7 +46,7 @@ object WikipediaPageRank {
     val host = args(3)
     val usePartitioner = args(4).toBoolean
 
-    sparkConf.setMasterUrl(host).setAppName("WikipediaPageRank")
+    sparkConf.setMaster(host).setAppName("WikipediaPageRank")
     val sc = new SparkContext(sparkConf)
 
     // Parse the Wikipedia page data into a graph

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 5bf0b7a..2cf273a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -35,7 +35,7 @@ object WikipediaPageRankStandalone {
       System.exit(-1)
     }
     val sparkConf = new SparkConf()
-    sparkConf.set("spark.serializer",  "spark.bagel.examples.WPRSerializer")
+    sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer")
 
 
     val inputFile = args(0)
@@ -44,7 +44,7 @@ object WikipediaPageRankStandalone {
     val host = args(3)
     val usePartitioner = args(4).toBoolean
 
-    sparkConf.setMasterUrl(host).setAppName("WikipediaPageRankStandalone")
+    sparkConf.setMaster(host).setAppName("WikipediaPageRankStandalone")
 
     val sc = new SparkContext(sparkConf)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 2f2d106..8b27ecf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -579,12 +579,12 @@ object ALS {
     val alpha = if (args.length >= 8) args(7).toDouble else 1
     val blocks = if (args.length == 9) args(8).toInt else -1
     val sc = new SparkContext(master, "ALS")
-    sc.conf.set("spark.serializer",  "org.apache.spark.serializer.KryoSerializer")
+    sc.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     sc.conf.set("spark.kryo.registrator",  classOf[ALSRegistrator].getName)
-    sc.conf.set("spark.kryo.referenceTracking",  "false")
-    sc.conf.set("spark.kryoserializer.buffer.mb",  "8")
-    sc.conf.set("spark.locality.wait",  "10000")
-    
+    sc.conf.set("spark.kryo.referenceTracking", "false")
+    sc.conf.set("spark.kryoserializer.buffer.mb", "8")
+    sc.conf.set("spark.locality.wait", "10000")
+
     val ratings = sc.textFile(ratingsFile).map { line =>
       val fields = line.split(',')
       Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 433268a..91e35e2 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
 class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
 
   def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-  
+
   private var rpc: YarnRPC = YarnRPC.create(conf)
   private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
   private var appAttemptId: ApplicationAttemptId = _
@@ -81,12 +81,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // Workaround until hadoop moves to something which has
     // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
     // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-    
+
     ApplicationMaster.register(this)
 
     // Start the user's JAR
     userThread = startUserClass()
-    
+
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
     waitForSparkMaster()
@@ -99,7 +99,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // Allocate all containers
     allocateWorkers()
 
-    // Wait for the user class to Finish     
+    // Wait for the user class to Finish
     userThread.join()
 
     System.exit(0)
@@ -119,7 +119,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     }
     localDirs
   }
-  
+
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
@@ -128,17 +128,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     logInfo("ApplicationAttemptId: " + appAttemptId)
     appAttemptId
   }
-  
+
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
     logInfo("Registering the ApplicationMaster")
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
   }
-  
+
   private def waitForSparkMaster() {
     logInfo("Waiting for Spark driver to be reachable.")
     var driverUp = false
     var tries = 0
-    val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries",  "10").toInt
+    val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
     while (!driverUp && tries < numTries) {
       val driverHost = conf.get("spark.driver.host")
       val driverPort = conf.get("spark.driver.port")
@@ -199,7 +199,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       ApplicationMaster.sparkContextRef.synchronized {
         var numTries = 0
         val waitTime = 10000L
-        val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries",  "10").toInt
+        val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
         while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
           logInfo("Waiting for Spark context initialization ... " + numTries)
           numTries = numTries + 1
@@ -214,7 +214,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
             yarnConf,
             amClient,
             appAttemptId,
-            args, 
+            args,
             sparkContext.preferredNodeLocationData)
         } else {
           logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
@@ -265,7 +265,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
       // we want to be reasonably responsive without causing too many requests to RM.
       val schedulerInterval =
-        conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms",  "5000").toLong
+        conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
 
       // must be <= timeoutInterval / 2.
       val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -314,11 +314,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     for (container <- containers) {
       logInfo("Launching shell command on a new container."
         + ", containerId=" + container.getId()
-        + ", containerNode=" + container.getNodeId().getHost() 
+        + ", containerNode=" + container.getNodeId().getHost()
         + ":" + container.getNodeId().getPort()
         + ", containerNodeURI=" + container.getNodeHttpAddress()
         + ", containerState" + container.getState()
-        + ", containerResourceMemory"  
+        + ", containerResourceMemory"
         + container.getResource().getMemory())
     }
   }
@@ -338,12 +338,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
   }
 
   /**
-   * Clean up the staging directory. 
+   * Clean up the staging directory.
    */
-  private def cleanupStagingDir() { 
+  private def cleanupStagingDir() {
     var stagingDirPath: Path = null
     try {
-      val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files",  "false").toBoolean
+      val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
       if (!preserveFiles) {
         stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         if (stagingDirPath == null) {
@@ -359,7 +359,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     }
   }
 
-  // The shutdown hook that runs when a signal is received AND during normal close of the JVM. 
+  // The shutdown hook that runs when a signal is received AND during normal close of the JVM.
   class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {
 
     def run() {
@@ -415,18 +415,18 @@ object ApplicationMaster {
     // Note that this will unfortunately not properly clean up the staging files because it gets
     // called too late, after the filesystem is already shutdown.
     if (modified) {
-      Runtime.getRuntime().addShutdownHook(new Thread with Logging { 
+      Runtime.getRuntime().addShutdownHook(new Thread with Logging {
         // This is not only logs, but also ensures that log system is initialized for this instance
         // when we are actually 'run'-ing.
         logInfo("Adding shutdown hook for context " + sc)
-        override def run() { 
-          logInfo("Invoking sc stop from shutdown hook") 
-          sc.stop() 
+        override def run() {
+          logInfo("Invoking sc stop from shutdown hook")
+          sc.stop()
           // Best case ...
           for (master <- applicationMasters) {
             master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
           }
-        } 
+        }
       } )
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a322f60..963b5b8 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
 
-import org.apache.spark.Logging 
+import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 import org.apache.spark.deploy.SparkHadoopUtil
 
@@ -150,7 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         queueInfo.getChildQueues.size))
   }
 
-  def verifyClusterResources(app: GetNewApplicationResponse) = { 
+  def verifyClusterResources(app: GetNewApplicationResponse) = {
     val maxMem = app.getMaximumResourceCapability().getMemory()
     logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
 
@@ -221,7 +221,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
       fs.setReplication(newPath, replication)
       if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
-    } 
+    }
     // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
     // version shows the specific version in the distributed cache configuration
     val qualPath = fs.makeQualified(newPath)
@@ -244,7 +244,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val replication = conf.getOrElse("spark.yarn.submit.file.replication",  "3").toShort
+    val replication = conf.getOrElse("spark.yarn.submit.file.replication", "3").toShort
 
     if (UserGroupInformation.isSecurityEnabled()) {
       val dstFs = dst.getFileSystem(conf)
@@ -269,7 +269,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         }
         val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
         val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
           destName, statCache)
       }
     }
@@ -283,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         val destPath = copyRemoteFile(dst, localPath, replication)
         // Only add the resource to the Spark ApplicationMaster.
         val appMasterOnly = true
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
           linkname, statCache, appMasterOnly)
       }
     }
@@ -295,7 +295,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         val localPath = new Path(localURI)
         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
         val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, 
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE,
           linkname, statCache)
       }
     }
@@ -307,7 +307,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
         val localPath = new Path(localURI)
         val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
         val destPath = copyRemoteFile(dst, localPath, replication)
-        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, 
+        distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE,
           linkname, statCache)
       }
     }
@@ -317,7 +317,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
   }
 
   def setupLaunchEnv(
-      localResources: HashMap[String, LocalResource], 
+      localResources: HashMap[String, LocalResource],
       stagingDir: String): HashMap[String, String] = {
     logInfo("Setting up the launch environment")
     val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null)
@@ -406,11 +406,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     }
 
     val commands = List[String](
-      javaCommand + 
+      javaCommand +
       " -server " +
       JAVA_OPTS +
       " " + args.amClass +
-      " --class " + args.userClass + 
+      " --class " + args.userClass +
       " --jar " + args.userJar +
       userArgsToString(args) +
       " --worker-memory " + args.workerMemory +
@@ -436,7 +436,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
     super.submitApplication(appContext)
   }
 
-  def monitorApplication(appId: ApplicationId): Boolean = {  
+  def monitorApplication(appId: ApplicationId): Boolean = {
     while (true) {
       Thread.sleep(1000)
       val report = super.getApplicationReport(appId)
@@ -458,7 +458,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
 
       val state = report.getYarnApplicationState()
       val dsStatus = report.getFinalApplicationStatus()
-      if (state == YarnApplicationState.FINISHED || 
+      if (state == YarnApplicationState.FINISHED ||
         state == YarnApplicationState.FAILED ||
         state == YarnApplicationState.KILLED) {
         return true
@@ -495,25 +495,25 @@ object Client {
     Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
     // If log4j present, ensure ours overrides all others
     if (addLog4j) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + LOG4J_PROP)
     }
     // Normally the users app.jar is last in case conflicts with spark jars
-    val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first",  "false")
+    val userClasspathFirst = conf.getOrElse("spark.yarn.user.classpath.first", "false")
       .toBoolean
     if (userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + APP_JAR)
     }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
       Path.SEPARATOR + SPARK_JAR)
     Client.populateHadoopClasspath(conf, env)
 
     if (!userClasspathFirst) {
-      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+      Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
         Path.SEPARATOR + APP_JAR)
     }
-    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + 
+    Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
       Path.SEPARATOR + "*")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 41ac292..1a9bb97 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -35,7 +35,7 @@ class ClientArguments(val args: Array[String]) {
   var workerMemory = 1024 // MB
   var workerCores = 1
   var numWorkers = 2
-  var amQueue = conf.getOrElse("QUEUE",  "default")
+  var amQueue = conf.getOrElse("QUEUE", "default")
   var amMemory: Int = 512 // MB
   var amClass: String = "org.apache.spark.deploy.yarn.ApplicationMaster"
   var appName: String = "Spark"

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index b2f499e..f108c70 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -35,6 +35,7 @@ import java.lang.{Class => jClass}
 import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse}
 
 import org.apache.spark.Logging
+import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 
 /** The Scala interactive shell.  It provides a read-eval-print loop
@@ -929,7 +930,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
   }
 
   def createSparkContext(): SparkContext = {
-    val uri = System.getenv("SPARK_EXECUTOR_URI")
+    val execUri = System.getenv("SPARK_EXECUTOR_URI")
     val master = this.master match {
       case Some(m) => m
       case None => {
@@ -938,11 +939,16 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
       }
     }
     val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
-    sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
-    if (uri != null) {
-      sparkContext.conf.set("spark.executor.uri",  uri)
+    val conf = new SparkConf()
+      .setMaster(master)
+      .setAppName("Spark shell")
+      .setSparkHome(System.getenv("SPARK_HOME"))
+      .setJars(jars)
+      .set("spark.repl.class.uri", intp.classServer.uri)
+    if (execUri != null) {
+      conf.set("spark.executor.uri", execUri)
     }
-    sparkContext.conf.set("spark.repl.class.uri",  intp.classServer.uri)
+    sparkContext = new SparkContext(conf)
     echo("Created spark context..")
     sparkContext
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 0d412e4..a993083 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -34,7 +34,7 @@ import scala.tools.reflect.StdRuntimeTags._
 import scala.util.control.ControlThrowable
 import util.stackTraceString
 
-import org.apache.spark.{SparkContext, HttpServer, SparkEnv, Logging}
+import org.apache.spark.{HttpServer, SparkConf, Logging}
 import org.apache.spark.util.Utils
 
 // /** directory to save .class files to */
@@ -89,7 +89,7 @@ import org.apache.spark.util.Utils
       /** Local directory to save .class files too */
       val outputDir = {
         val tmp = System.getProperty("java.io.tmpdir")
-        val rootDir = SparkContext.globalConf.getOrElse("spark.repl.classdir",  tmp)
+        val rootDir = new SparkConf().getOrElse("spark.repl.classdir",  tmp)
         Utils.createTempDir(rootDir)
       }
       if (SPARK_DEBUG_REPL) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b8e1427..f106bba 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.MetadataCleaner
 
@@ -36,12 +36,11 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
   val framework = ssc.sc.appName
   val sparkHome = ssc.sc.getSparkHome.getOrElse(null)
   val jars = ssc.sc.jars
-  val environment = ssc.sc.environment
   val graph = ssc.graph
   val checkpointDir = ssc.checkpointDir
   val checkpointDuration = ssc.checkpointDuration
   val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
-  val delaySeconds = MetadataCleaner.getDelaySeconds
+  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
   val sparkConf = ssc.sc.conf
 
   def validate() {
@@ -58,7 +57,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
  * Convenience class to speed up the writing of graph checkpoint to file
  */
 private[streaming]
-class CheckpointWriter(checkpointDir: String) extends Logging {
+class CheckpointWriter(conf: SparkConf, checkpointDir: String) extends Logging {
   val file = new Path(checkpointDir, "graph")
   // The file to which we actually write - and then "move" to file.
   private val writeFile = new Path(file.getParent, file.getName + ".next")
@@ -66,14 +65,14 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
 
   private var stopped = false
 
-  val conf = new Configuration()
-  var fs = file.getFileSystem(conf)
+  val hadoopConf = new Configuration()
+  var fs = file.getFileSystem(hadoopConf)
   val maxAttempts = 3
   val executor = Executors.newFixedThreadPool(1)
 
-  private val compressionCodec = CompressionCodec.createCodec()
+  private val compressionCodec = CompressionCodec.createCodec(conf)
 
-  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since 
+  // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
   // I did not notice any errors - reintroduce it ?
 
   class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
@@ -142,11 +141,12 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
 private[streaming]
 object CheckpointReader extends Logging {
 
-  def read(path: String): Checkpoint = {
+  def read(conf: SparkConf, path: String): Checkpoint = {
     val fs = new Path(path).getFileSystem(new Configuration())
-    val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+    val attempts = Seq(
+      new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
 
-    val compressionCodec = CompressionCodec.createCodec()
+    val compressionCodec = CompressionCodec.createCodec(conf)
 
     attempts.foreach(file => {
       if (fs.exists(file)) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/642029e7/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index 329d2b5..8005202 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -213,7 +213,7 @@ abstract class DStream[T: ClassTag] (
         checkpointDuration + "). Please set it to higher than " + checkpointDuration + "."
     )
 
-    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds
+    val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf)
     logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
     assert(
       metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,