You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/01/10 00:31:34 UTC

[1/2] git commit: Use typed getters for configuration settings

Updated Branches:
  refs/heads/master 365cac946 -> 12f414ed4


Use typed getters for configuration settings


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a01f3401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a01f3401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a01f3401

Branch: refs/heads/master
Commit: a01f3401e32ca4324884d13c9fad53c6c87bb5f0
Parents: dceedb4
Author: Matei Zaharia <ma...@databricks.com>
Authored: Wed Jan 8 17:32:15 2014 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jan 9 00:07:29 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  2 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  4 ++--
 .../apache/spark/broadcast/HttpBroadcast.scala  |  4 ++--
 .../spark/broadcast/TorrentBroadcast.scala      |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  6 +++---
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  2 +-
 .../spark/network/ConnectionManager.scala       | 18 ++++++++--------
 .../spark/network/netty/ShuffleCopier.scala     |  2 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  4 ++--
 .../spark/scheduler/TaskResultGetter.scala      |  2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  8 +++----
 .../apache/spark/scheduler/TaskSetManager.scala |  8 +++----
 .../cluster/CoarseGrainedSchedulerBackend.scala |  5 ++---
 .../cluster/SimrSchedulerBackend.scala          |  2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  2 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  2 +-
 .../spark/serializer/KryoSerializer.scala       |  4 ++--
 .../spark/storage/BlockFetcherIterator.scala    |  2 +-
 .../org/apache/spark/storage/BlockManager.scala | 22 ++++++++++----------
 .../spark/storage/BlockManagerMaster.scala      |  4 ++--
 .../apache/spark/storage/DiskBlockManager.scala |  2 +-
 .../spark/storage/ShuffleBlockManager.scala     |  4 ++--
 .../spark/ui/jobs/JobProgressListener.scala     |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 20 +++++++++---------
 .../org/apache/spark/util/MetadataCleaner.scala |  2 +-
 .../org/apache/spark/util/XORShiftRandom.scala  |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 .../apache/spark/util/XORShiftRandomSuite.scala |  2 +-
 .../streaming/dstream/NetworkInputDStream.scala |  4 ++--
 .../apache/spark/streaming/scheduler/Job.scala  |  2 +-
 .../streaming/scheduler/JobGenerator.scala      |  2 +-
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 34 files changed, 78 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e47f4e..9d52544 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -116,7 +116,7 @@ class SparkContext(
     throw new SparkException("An application must be set in your configuration")
   }
 
-  if (conf.get("spark.logConf", "false").toBoolean) {
+  if (conf.getBoolean("spark.logConf", false)) {
     logInfo("Spark configuration:\n" + conf.toDebugString)
   }
 
@@ -1203,7 +1203,7 @@ object SparkContext {
       case mesosUrl @ MESOS_REGEX(_) =>
         MesosNativeLibrary.load()
         val scheduler = new TaskSchedulerImpl(sc)
-        val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean
+        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
           new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 2e36ccb..e093e2f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -162,7 +162,7 @@ object SparkEnv extends Logging {
         actorSystem.actorOf(Props(newActor), name = name)
       } else {
         val driverHost: String = conf.get("spark.driver.host", "localhost")
-        val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
+        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
         Utils.checkHost(driverHost, "Expected hostname")
         val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
         val timeout = AkkaUtils.lookupTimeout(conf)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 32cc70e..40c519b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
     accumulator: Accumulator[JList[Array[Byte]]])
   extends RDD[Array[Byte]](parent) {
 
-  val bufferSize = conf.get("spark.buffer.size", "65536").toInt
+  val bufferSize = conf.getInt("spark.buffer.size", 65536)
 
   override def getPartitions = parent.partitions
 
@@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
 
   Utils.checkHost(serverHost, "Expected hostname")
 
-  val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt
+  val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)
 
   override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index db596d5..0eacda3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging {
   def initialize(isDriver: Boolean, conf: SparkConf) {
     synchronized {
       if (!initialized) {
-        bufferSize = conf.get("spark.buffer.size", "65536").toInt
-        compress = conf.get("spark.broadcast.compress", "true").toBoolean
+        bufferSize = conf.getInt("spark.buffer.size", 65536)
+        compress = conf.getBoolean("spark.broadcast.compress", true)
         if (isDriver) {
           createServer(conf)
           conf.set("spark.httpBroadcast.uri",  serverUri)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 9530938..fdf92ec 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -180,7 +180,7 @@ extends Logging {
     initialized = false
   }
 
-  lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024
+  lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
 
   def blockifyObject[T](obj: T): TorrentInfo = {
     val byteArray = Utils.serialize[T](obj)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 6617b71..066d110 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -43,9 +43,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val conf = new SparkConf
 
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000
-  val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt
-  val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt
+  val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
+  val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
+  val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
   val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
   val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index fcaf4e9..0538e52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -55,7 +55,7 @@ private[spark] class Worker(
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
   // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
-  val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4
+  val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
 
   val REGISTRATION_TIMEOUT = 20.seconds
   val REGISTRATION_RETRIES = 3

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index a1e9884..5980177 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
 class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt
+    val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
     new SnappyOutputStream(s, blockSize)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 46c40d0..e6e0178 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   private val selector = SelectorProvider.provider.openSelector()
 
   private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.handler.threads.min", "20").toInt,
-    conf.get("spark.core.connection.handler.threads.max", "60").toInt,
-    conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.handler.threads.min", 20),
+    conf.getInt("spark.core.connection.handler.threads.max", 60),
+    conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.io.threads.min", "4").toInt,
-    conf.get("spark.core.connection.io.threads.max", "32").toInt,
-    conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.io.threads.min", 4),
+    conf.getInt("spark.core.connection.io.threads.max", 32),
+    conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
   private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.get("spark.core.connection.connect.threads.min", "1").toInt,
-    conf.get("spark.core.connection.connect.threads.max", "8").toInt,
-    conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
+    conf.getInt("spark.core.connection.connect.threads.min", 1),
+    conf.getInt("spark.core.connection.connect.threads.max", 8),
+    conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b729eb1..d87157e 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
       resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
 
     val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
-    val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt
+    val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000)
     val fc = new FileClient(handler, connectTimeout)
 
     try {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 6d4f461..83109d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging {
       throw new IOException("Checkpoint failed: temporary path " +
         tempOutputPath + " already exists")
     }
-    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
 
     val fileOutputStream = if (blockSize < 0) {
       fs.create(tempOutputPath, false, bufferSize)
@@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging {
     ): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(broadcastedConf.value.value)
-    val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt
+    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     val fileInputStream = fs.open(path, bufferSize)
     val serializer = env.serializer.newInstance()
     val deserializeStream = serializer.deserializeStream(fileInputStream)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index e22b1e5..c52d617 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
 private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl)
   extends Logging {
 
-  private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt
+  private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
   private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
     THREADS, "Result resolver thread")
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 0c8ed62..d4f74d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl(
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
-  def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt)
+  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
 
   val conf = sc.conf
 
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong
+  val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong
+  val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
 
   // TaskSetManagers are not thread safe, so any access to one should be synchronized
   // on this class.
@@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl(
   override def start() {
     backend.start()
 
-    if (!isLocal && conf.get("spark.speculation", "false").toBoolean) {
+    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6dd1469..a10e539 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -57,11 +57,11 @@ private[spark] class TaskSetManager(
   val conf = sched.sc.conf
 
   // CPUs to request per task
-  val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt
+  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble
-  val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble
+  val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
+  val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -116,7 +116,7 @@ private[spark] class TaskSetManager(
 
   // How frequently to reprint duplicate exceptions in full, in milliseconds
   val EXCEPTION_PRINT_INTERVAL =
-    conf.get("spark.logging.exceptionPrintInterval", "10000").toLong
+    conf.getLong("spark.logging.exceptionPrintInterval", 10000)
 
   // Map of recent exceptions (identified by string representation and top stack frame) to
   // duplicate count (how many times the same exception has appeared) and time the full exception

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2f5bcaf..8d596a7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -63,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
       // Periodically revive offers to allow delay scheduling to work
-      val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong
+      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
       import context.dispatcher
       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
     }
@@ -209,8 +209,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   }
 
   override def defaultParallelism(): Int = {
-    conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse(
-      math.max(totalCoreCount.get(), 2))
+    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
   }
 
   // Called by subclasses when notified of a lost worker

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index b44d1e4..d99c761 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend(
   val tmpPath = new Path(driverFilePath + "_tmp")
   val filePath = new Path(driverFilePath)
 
-  val maxCores = conf.get("spark.simr.executor.cores", "1").toInt
+  val maxCores = conf.getInt("spark.simr.executor.cores", 1)
 
   override def start() {
     super.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d46fceb..e16d60c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend(
     "Spark home is not set; set it through the spark.home system " +
     "property, the SPARK_HOME environment variable or the SparkContext constructor"))
 
-  val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt
+  val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
 
   var nextMesosTaskId = 0
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index ae8d527..b428c82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend(
   }
 
   // TODO: query Mesos for number of cores
-  override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt
+  override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index a24a3b0..c14cd47 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -36,7 +36,7 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
  */
 class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
   private val bufferSize = {
-    conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+    conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
   }
 
   def newKryoOutput() = new KryoOutput(bufferSize)
@@ -48,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial
 
     // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
     // Do this before we invoke the user registrator so the user registrator can override this.
-    kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean)
+    kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true))
 
     for (cls <- KryoSerializer.toRegister) kryo.register(cls)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 4747863..4fa2ab9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -327,7 +327,7 @@ object BlockFetcherIterator {
         fetchRequestsSync.put(request)
       }
 
-      copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt)
+      copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
       logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
         Utils.getUsedTimeMs(startTime))
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6d2cda9..c56e2ca 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -58,8 +58,8 @@ private[spark] class BlockManager(
 
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
   private val nettyPort: Int = {
-    val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean
-    val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt
+    val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
+    val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0)
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
@@ -72,14 +72,14 @@ private[spark] class BlockManager(
   // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
   // for receiving shuffle outputs)
   val maxBytesInFlight =
-    conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
+    conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024
 
   // Whether to compress broadcast variables that are stored
-  val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean
+  val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
   // Whether to compress shuffle output that are stored
-  val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean
+  val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
   // Whether to compress RDD partitions that are stored serialized
-  val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean
+  val compressRdds = conf.getBoolean("spark.rdd.compress", false)
 
   val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
 
@@ -443,7 +443,7 @@ private[spark] class BlockManager(
       : BlockFetcherIterator = {
 
     val iter =
-      if (conf.get("spark.shuffle.use.netty", "false").toBoolean) {
+      if (conf.getBoolean("spark.shuffle.use.netty", false)) {
         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
       } else {
         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
@@ -469,7 +469,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean
+    val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
     new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
   }
 
@@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging {
   val ID_GENERATOR = new IdGenerator
 
   def getMaxMemory(conf: SparkConf): Long = {
-    val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble
+    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66)
     (Runtime.getRuntime.maxMemory * memoryFraction).toLong
   }
 
   def getHeartBeatFrequency(conf: SparkConf): Long =
-    conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
+    conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) / 4
 
   def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean =
-    conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+    conf.getBoolean("spark.test.disableBlockManagerHeartBeat", false)
 
   /**
    * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 51a29ed..c54e4f2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -30,8 +30,8 @@ import org.apache.spark.util.AkkaUtils
 private[spark]
 class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
 
-  val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
-  val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
+  val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
+  val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 55dcb37..edc1133 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
-  private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt
+  private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64)
 
   // Create one local directory for each path mentioned in spark.local.dir; then, inside this
   // directory, create multiple subdirectories that we will hash files into, in order to avoid

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 39dc7bb..e2b2429 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -64,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) {
   // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
   // TODO: Remove this once the shuffle file consolidation feature is stable.
   val consolidateShuffleFiles =
-    conf.get("spark.shuffle.consolidateFiles", "false").toBoolean
+    conf.getBoolean("spark.shuffle.consolidateFiles", false)
 
-  private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024
+  private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
 
   /**
    * Contains all the state related to a particular shuffle. This includes a pool of unused

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d6d9f0c..bcd2824 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -33,7 +33,7 @@ import org.apache.spark.scheduler._
  */
 private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
   // How many stages to remember
-  val RETAINED_STAGES = sc.conf.get("spark.ui.retainedStages", "1000").toInt
+  val RETAINED_STAGES = sc.conf.getInt("spark.ui.retainedStages", 1000)
   val DEFAULT_POOL_NAME = "default"
 
   val stageIdToPool = new HashMap[Int, String]()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 3f009a8..761d378 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -44,13 +44,13 @@ private[spark] object AkkaUtils {
   def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
     conf: SparkConf): (ActorSystem, Int) = {
 
-    val akkaThreads   = conf.get("spark.akka.threads", "4").toInt
-    val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt
+    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
+    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
 
-    val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt
+    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
 
-    val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt
-    val akkaLogLifecycleEvents = conf.get("spark.akka.logLifecycleEvents", "false").toBoolean
+    val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10)
+    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
     val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
     if (!akkaLogLifecycleEvents) {
       // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
@@ -58,12 +58,12 @@ private[spark] object AkkaUtils {
       Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
     }
 
-    val logAkkaConfig = if (conf.get("spark.akka.logAkkaConfig", "false").toBoolean) "on" else "off"
+    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
 
-    val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt
+    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600)
     val akkaFailureDetector =
-      conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble
-    val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt
+      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
+    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
 
     val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
       ConfigFactory.parseString(
@@ -103,7 +103,7 @@ private[spark] object AkkaUtils {
 
   /** Returns the default Spark timeout to use for Akka ask operations. */
   def askTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
+    Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
   }
 
   /** Returns the default Spark timeout to use for Akka remote actor lookup. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index aa7f52c..3d1e90a 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -74,7 +74,7 @@ object MetadataCleanerType extends Enumeration {
 // initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
 object MetadataCleaner {
   def getDelaySeconds(conf: SparkConf) = {
-    conf.get("spark.cleaner.ttl", "3500").toInt
+    conf.getInt("spark.cleaner.ttl", 3500)
   }
 
   def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
index e9907e6..08b31ac 100644
--- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
@@ -91,4 +91,4 @@ private[spark] object XORShiftRandom {
 
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 1eec672..c9f6cc5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -83,7 +83,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   private val conf = new SparkConf
 
-  val LOCALITY_WAIT = conf.get("spark.locality.wait", "3000").toLong
+  val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
   val MAX_TASK_FAILURES = 4
 
   test("TaskSet with no preferences") {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
index b78367b..f1d7b61 100644
--- a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
@@ -73,4 +73,4 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
 
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 27d474c..d41f726 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -175,7 +175,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
   private class NetworkReceiverActor extends Actor {
     logInfo("Attempting to register with tracker")
     val ip = env.conf.get("spark.driver.host", "localhost")
-    val port = env.conf.get("spark.driver.port", "7077").toInt
+    val port = env.conf.getInt("spark.driver.port", 7077)
     val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
     val tracker = env.actorSystem.actorSelection(url)
     val timeout = 5.seconds
@@ -212,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
     case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
-    val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong
+    val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
     val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
     val blockStorageLevel = storageLevel
     val blocksForPushing = new ArrayBlockingQueue[Block](1000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7341bfb..c8ee93b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -38,4 +38,4 @@ class Job(val time: Time, func: () => _) {
   }
 
   override def toString = id
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 5f8be93..3c624e8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     // or if the property is defined set it to that time
     if (clock.isInstanceOf[ManualClock]) {
       val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
-      val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong
+      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
       clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a01f3401/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9304fc1..30c070c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -31,7 +31,7 @@ private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
 
   val jobSets = new ConcurrentHashMap[Time, JobSet]
-  val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt
+  val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
   val executor = Executors.newFixedThreadPool(numConcurrentJobs)
   val generator = new JobGenerator(this)
   val listenerBus = new StreamingListenerBus()


[2/2] git commit: Merge pull request #362 from mateiz/conf-getters

Posted by ma...@apache.org.
Merge pull request #362 from mateiz/conf-getters

Use typed getters for configuration settings

This improves some of the code style after SPARK-544.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/12f414ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/12f414ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/12f414ed

Branch: refs/heads/master
Commit: 12f414ed43c54db706c8bc3e71f9b84465314ac2
Parents: 365cac9 a01f340
Author: Matei Zaharia <ma...@databricks.com>
Authored: Thu Jan 9 15:31:30 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jan 9 15:31:30 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  2 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  4 ++--
 .../apache/spark/broadcast/HttpBroadcast.scala  |  4 ++--
 .../spark/broadcast/TorrentBroadcast.scala      |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  6 +++---
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  2 +-
 .../spark/network/ConnectionManager.scala       | 18 ++++++++--------
 .../spark/network/netty/ShuffleCopier.scala     |  2 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  4 ++--
 .../spark/scheduler/TaskResultGetter.scala      |  2 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  8 +++----
 .../apache/spark/scheduler/TaskSetManager.scala |  8 +++----
 .../cluster/CoarseGrainedSchedulerBackend.scala |  5 ++---
 .../cluster/SimrSchedulerBackend.scala          |  2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  2 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  2 +-
 .../spark/serializer/KryoSerializer.scala       |  4 ++--
 .../spark/storage/BlockFetcherIterator.scala    |  2 +-
 .../org/apache/spark/storage/BlockManager.scala | 22 ++++++++++----------
 .../spark/storage/BlockManagerMaster.scala      |  4 ++--
 .../apache/spark/storage/DiskBlockManager.scala |  2 +-
 .../spark/storage/ShuffleBlockManager.scala     |  4 ++--
 .../spark/ui/jobs/JobProgressListener.scala     |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 20 +++++++++---------
 .../org/apache/spark/util/MetadataCleaner.scala |  2 +-
 .../org/apache/spark/util/XORShiftRandom.scala  |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  2 +-
 .../apache/spark/util/XORShiftRandomSuite.scala |  2 +-
 .../streaming/dstream/NetworkInputDStream.scala |  4 ++--
 .../apache/spark/streaming/scheduler/Job.scala  |  2 +-
 .../streaming/scheduler/JobGenerator.scala      |  2 +-
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 34 files changed, 78 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/12f414ed/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------