You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/14 01:28:12 UTC

spark git commit: [SPARK-5931][CORE] Use consistent naming for time properties

Repository: spark
Updated Branches:
  refs/heads/master c5602bdc3 -> c4ab255e9


[SPARK-5931][CORE] Use consistent naming for time properties

I've added new utility methods to do the conversion from times specified as e.g. 120s, 240ms, 360us to convert to a consistent internal representation. I've updated usage of these constants throughout the code to be consistent.

I believe I've captured all usages of time-based properties throughout the code. I've also updated variable names in a number of places to reflect their units for clarity and updated documentation where appropriate.

Author: Ilya Ganelin <il...@capitalone.com>
Author: Ilya Ganelin <il...@gmail.com>

Closes #5236 from ilganeli/SPARK-5931 and squashes the following commits:

4526c81 [Ilya Ganelin] Update configuration.md
de3bff9 [Ilya Ganelin] Fixing style errors
f5fafcd [Ilya Ganelin] Doc updates
951ca2d [Ilya Ganelin] Made the most recent round of changes
bc04e05 [Ilya Ganelin] Minor fixes and doc updates
25d3f52 [Ilya Ganelin] Minor nit fixes
642a06d [Ilya Ganelin] Fixed logic for invalid suffixes and addid matching test
8927e66 [Ilya Ganelin] Fixed handling of -1
69fedcc [Ilya Ganelin] Added test for zero
dc7bd08 [Ilya Ganelin] Fixed error in exception handling
7d19cdd [Ilya Ganelin] Added fix for possible NPE
6f651a8 [Ilya Ganelin] Now using regexes to simplify code in parseTimeString. Introduces getTimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation
cbd2ca6 [Ilya Ganelin] Formatting error
1a1122c [Ilya Ganelin] Formatting fixes and added m for use as minute formatter
4e48679 [Ilya Ganelin] Fixed priority order and mixed up conversions in a couple spots
d4efd26 [Ilya Ganelin] Added time conversion for yarn.scheduler.heartbeat.interval-ms
cbf41db [Ilya Ganelin] Got rid of thrown exceptions
1465390 [Ilya Ganelin] Nit
28187bf [Ilya Ganelin] Convert straight to seconds
ff40bfe [Ilya Ganelin] Updated tests to fix small bugs
19c31af [Ilya Ganelin] Added cleaner computation of time conversions in tests
6387772 [Ilya Ganelin] Updated suffix handling to handle overlap of units more gracefully
5193d5f [Ilya Ganelin] Resolved merge conflicts
76cfa27 [Ilya Ganelin] [SPARK-5931] Minor nit fixes'
bf779b0 [Ilya Ganelin] Special handling of overlapping usffixes for java
dd0a680 [Ilya Ganelin] Updated scala code to call into java
b2fc965 [Ilya Ganelin] replaced get or default since it's not present in this version of java
39164f9 [Ilya Ganelin] [SPARK-5931] Updated Java conversion to be similar to scala conversion. Updated conversions to clean up code a little using TimeUnit.convert. Added Unit tests
3b126e1 [Ilya Ganelin] Fixed conversion to US from seconds
1858197 [Ilya Ganelin] Fixed bug where all time was being converted to us instead of the appropriate units
bac9edf [Ilya Ganelin] More whitespace
8613631 [Ilya Ganelin] Whitespace
1c0c07c [Ilya Ganelin] Updated Java code to add day, minutes, and hours
647b5ac [Ilya Ganelin] Udpated time conversion to use map iterator instead of if fall through
70ac213 [Ilya Ganelin] Fixed remaining usages to be consistent. Updated Java-side time conversion
68f4e93 [Ilya Ganelin] Updated more files to clean up usage of default time strings
3a12dd8 [Ilya Ganelin] Updated host revceiver
5232a36 [Ilya Ganelin] [SPARK-5931] Changed default behavior of time string conversion.
499bdf0 [Ilya Ganelin] Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931
9e2547c [Ilya Ganelin] Reverting doc changes
8f741e1 [Ilya Ganelin] Update JavaUtils.java
34f87c2 [Ilya Ganelin] Update Utils.scala
9a29d8d [Ilya Ganelin] Fixed misuse of time in streaming context test
42477aa [Ilya Ganelin] Updated configuration doc with note on specifying time properties
cde9bff [Ilya Ganelin] Updated spark.streaming.blockInterval
c6a0095 [Ilya Ganelin] Updated spark.core.connection.auth.wait.timeout
5181597 [Ilya Ganelin] Updated spark.dynamicAllocation.schedulerBacklogTimeout
2fcc91c [Ilya Ganelin] Updated spark.dynamicAllocation.executorIdleTimeout
6d1518e [Ilya Ganelin] Upated spark.speculation.interval
3f1cfc8 [Ilya Ganelin] Updated spark.scheduler.revive.interval
3352d34 [Ilya Ganelin] Updated spark.scheduler.maxRegisteredResourcesWaitingTime
272c215 [Ilya Ganelin] Updated spark.locality.wait
7320c87 [Ilya Ganelin] updated spark.akka.heartbeat.interval
064ebd6 [Ilya Ganelin] Updated usage of spark.cleaner.ttl
21ef3dd [Ilya Ganelin] updated spark.shuffle.sasl.timeout
c9f5cad [Ilya Ganelin] Updated spark.shuffle.io.retryWait
4933fda [Ilya Ganelin] Updated usage of spark.storage.blockManagerSlaveTimeout
7db6d2a [Ilya Ganelin] Updated usage of spark.akka.timeout
404f8c3 [Ilya Ganelin] Updated usage of spark.core.connection.ack.wait.timeout
59bf9e1 [Ilya Ganelin] [SPARK-5931] Updated Utils and JavaUtils classes to add helper methods to handle time strings. Updated time strings in a few places to properly parse time


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4ab255e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4ab255e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4ab255e

Branch: refs/heads/master
Commit: c4ab255e94366ba9b9023d5431f9d2412e0d6dc7
Parents: c5602bd
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Mon Apr 13 16:28:07 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Apr 13 16:28:07 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 34 ++++----
 .../org/apache/spark/HeartbeatReceiver.scala    | 15 ++--
 .../main/scala/org/apache/spark/SparkConf.scala | 36 ++++++++
 .../org/apache/spark/executor/Executor.scala    |  6 +-
 .../spark/network/nio/ConnectionManager.scala   |  3 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 10 +--
 .../apache/spark/scheduler/TaskSetManager.scala | 21 +++--
 .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +--
 .../scala/org/apache/spark/util/AkkaUtils.scala | 14 ++--
 .../org/apache/spark/util/MetadataCleaner.scala |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 26 +++++-
 .../spark/ExecutorAllocationManagerSuite.scala  |  7 +-
 .../network/nio/ConnectionManagerSuite.scala    |  8 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 25 +++---
 .../storage/BlockManagerReplicationSuite.scala  |  2 +-
 .../org/apache/spark/util/UtilsSuite.scala      | 44 ++++++++++
 docs/configuration.md                           | 86 +++++++++++---------
 docs/running-on-yarn.md                         |  4 +-
 .../apache/spark/network/util/JavaUtils.java    | 66 +++++++++++++++
 .../spark/network/util/TransportConf.java       | 15 +++-
 .../streaming/receiver/BlockGenerator.scala     |  8 +-
 .../streaming/scheduler/JobGenerator.scala      | 12 ++-
 .../apache/spark/streaming/ReceiverSuite.scala  | 14 ++--
 .../spark/streaming/StreamingContextSuite.scala | 22 ++---
 .../spark/deploy/yarn/ApplicationMaster.scala   |  9 +-
 25 files changed, 345 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9385f55..4e7bf51 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -80,16 +80,16 @@ private[spark] class ExecutorAllocationManager(
     Integer.MAX_VALUE)
 
   // How long there must be backlogged tasks for before an addition is triggered (seconds)
-  private val schedulerBacklogTimeout = conf.getLong(
-    "spark.dynamicAllocation.schedulerBacklogTimeout", 5)
+  private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+    "spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
 
-  // Same as above, but used only after `schedulerBacklogTimeout` is exceeded
-  private val sustainedSchedulerBacklogTimeout = conf.getLong(
-    "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
+  // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
+  private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
+    "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")
 
   // How long an executor must be idle for before it is removed (seconds)
-  private val executorIdleTimeout = conf.getLong(
-    "spark.dynamicAllocation.executorIdleTimeout", 600)
+  private val executorIdleTimeoutS = conf.getTimeAsSeconds(
+    "spark.dynamicAllocation.executorIdleTimeout", "600s")
 
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
@@ -150,14 +150,14 @@ private[spark] class ExecutorAllocationManager(
       throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
         s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
     }
-    if (schedulerBacklogTimeout <= 0) {
+    if (schedulerBacklogTimeoutS <= 0) {
       throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
     }
-    if (sustainedSchedulerBacklogTimeout <= 0) {
+    if (sustainedSchedulerBacklogTimeoutS <= 0) {
       throw new SparkException(
         "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
     }
-    if (executorIdleTimeout <= 0) {
+    if (executorIdleTimeoutS <= 0) {
       throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
     }
     // Require external shuffle service for dynamic allocation
@@ -262,8 +262,8 @@ private[spark] class ExecutorAllocationManager(
     } else if (addTime != NOT_SET && now >= addTime) {
       val delta = addExecutors(maxNeeded)
       logDebug(s"Starting timer to add more executors (to " +
-        s"expire in $sustainedSchedulerBacklogTimeout seconds)")
-      addTime += sustainedSchedulerBacklogTimeout * 1000
+        s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
+      addTime += sustainedSchedulerBacklogTimeoutS * 1000
       delta
     } else {
       0
@@ -351,7 +351,7 @@ private[spark] class ExecutorAllocationManager(
     val removeRequestAcknowledged = testing || client.killExecutor(executorId)
     if (removeRequestAcknowledged) {
       logInfo(s"Removing executor $executorId because it has been idle for " +
-        s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
+        s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
       executorsPendingToRemove.add(executorId)
       true
     } else {
@@ -407,8 +407,8 @@ private[spark] class ExecutorAllocationManager(
   private def onSchedulerBacklogged(): Unit = synchronized {
     if (addTime == NOT_SET) {
       logDebug(s"Starting timer to add executors because pending tasks " +
-        s"are building up (to expire in $schedulerBacklogTimeout seconds)")
-      addTime = clock.getTimeMillis + schedulerBacklogTimeout * 1000
+        s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
+      addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
     }
   }
 
@@ -431,8 +431,8 @@ private[spark] class ExecutorAllocationManager(
     if (executorIds.contains(executorId)) {
       if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
         logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
-          s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
-        removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
+          s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
+        removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
       }
     } else {
       logWarning(s"Attempted to mark unknown executor $executorId idle")

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 5871b8c..e3bd16f 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -62,14 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
 
   // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
   // "milliseconds"
-  private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
-    getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))
-
+  private val slaveTimeoutMs = 
+    sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s")
+  private val executorTimeoutMs = 
+    sc.conf.getTimeAsSeconds("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000
+  
   // "spark.network.timeoutInterval" uses "seconds", while
   // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
-  private val checkTimeoutIntervalMs =
-    sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
-      getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
+  private val timeoutIntervalMs = 
+    sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
+  private val checkTimeoutIntervalMs = 
+    sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000
   
   private var timeoutCheckingTask: ScheduledFuture[_] = null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0c123c9..390e631 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -174,6 +174,42 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     getOption(key).getOrElse(defaultValue)
   }
 
+  /** 
+   * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no 
+   * suffix is provided then seconds are assumed.
+   * @throws NoSuchElementException
+   */
+  def getTimeAsSeconds(key: String): Long = {
+    Utils.timeStringAsSeconds(get(key))
+  }
+
+  /** 
+   * Get a time parameter as seconds, falling back to a default if not set. If no 
+   * suffix is provided then seconds are assumed.
+   * 
+   */
+  def getTimeAsSeconds(key: String, defaultValue: String): Long = {
+    Utils.timeStringAsSeconds(get(key, defaultValue))
+  }
+
+  /** 
+   * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no 
+   * suffix is provided then milliseconds are assumed. 
+   * @throws NoSuchElementException
+   */
+  def getTimeAsMs(key: String): Long = {
+    Utils.timeStringAsMs(get(key))
+  }
+
+  /** 
+   * Get a time parameter as milliseconds, falling back to a default if not set. If no 
+   * suffix is provided then milliseconds are assumed. 
+   */
+  def getTimeAsMs(key: String, defaultValue: String): Long = {
+    Utils.timeStringAsMs(get(key, defaultValue))
+  }
+  
+
   /** Get a parameter as an Option */
   def getOption(key: String): Option[String] = {
     Option(settings.get(key))

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 14f99a4..516f619 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -436,14 +436,14 @@ private[spark] class Executor(
    * This thread stops running when the executor is stopped.
    */
   private def startDriverHeartbeater(): Unit = {
-    val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
+    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
     val thread = new Thread() {
       override def run() {
         // Sleep a random interval so the heartbeats don't end up in sync
-        Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
+        Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
         while (!isStopped) {
           reportHeartBeat()
-          Thread.sleep(interval)
+          Thread.sleep(intervalMs)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 741fe3e..8e3c30f 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -82,7 +82,8 @@ private[nio] class ConnectionManager(
     new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
 
   private val ackTimeout =
-    conf.getInt("spark.core.connection.ack.wait.timeout", conf.getInt("spark.network.timeout", 120))
+    conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
+      conf.get("spark.network.timeout", "120s"))
 
   // Get the thread counts from the Spark Configuration.
   // 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 076b36e..2362cc7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -62,10 +62,10 @@ private[spark] class TaskSchedulerImpl(
   val conf = sc.conf
 
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100)
+  val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
 
   // Threshold above which we warn user initial TaskSet may be starved
-  val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
+  val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
 
   // CPUs to request per task
   val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
@@ -143,8 +143,8 @@ private[spark] class TaskSchedulerImpl(
     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
       import sc.env.actorSystem.dispatcher
-      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
-            SPECULATION_INTERVAL milliseconds) {
+      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
+            SPECULATION_INTERVAL_MS milliseconds) {
         Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
       }
     }
@@ -173,7 +173,7 @@ private[spark] class TaskSchedulerImpl(
               this.cancel()
             }
           }
-        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
       }
       hasReceivedTask = true
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d509881..7dc3252 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -848,15 +848,18 @@ private[spark] class TaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = conf.get("spark.locality.wait", "3000")
-    level match {
-      case TaskLocality.PROCESS_LOCAL =>
-        conf.get("spark.locality.wait.process", defaultWait).toLong
-      case TaskLocality.NODE_LOCAL =>
-        conf.get("spark.locality.wait.node", defaultWait).toLong
-      case TaskLocality.RACK_LOCAL =>
-        conf.get("spark.locality.wait.rack", defaultWait).toLong
-      case _ => 0L
+    val defaultWait = conf.get("spark.locality.wait", "3s")
+    val localityWaitKey = level match {
+      case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
+      case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
+      case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
+      case _ => null
+    }
+    
+    if (localityWaitKey != null) {
+      conf.getTimeAsMs(localityWaitKey, defaultWait)  
+    } else {
+      0L
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4c49da8..63987df 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -52,8 +52,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
   // if minRegisteredRatio has not yet been reached
-  val maxRegisteredWaitingTime =
-    conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
+  val maxRegisteredWaitingTimeMs =
+    conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
   val createTime = System.currentTimeMillis()
 
   private val executorDataMap = new HashMap[String, ExecutorData]
@@ -77,12 +77,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work
-      val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
+      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
+ 
       reviveThread.scheduleAtFixedRate(new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           Option(self).foreach(_.send(ReviveOffers))
         }
-      }, 0, reviveInterval, TimeUnit.MILLISECONDS)
+      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
     }
 
     override def receive: PartialFunction[Any, Unit] = {
@@ -301,9 +302,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
       return true
     }
-    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
       logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
-        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
+        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
       return true
     }
     false

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 6c2c526..8e8cc7c 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.util
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
 import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.util.Try
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -66,7 +65,8 @@ private[spark] object AkkaUtils extends Logging {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
     val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
-    val akkaTimeout = conf.getInt("spark.akka.timeout", conf.getInt("spark.network.timeout", 120))
+    val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
+      conf.get("spark.network.timeout", "120s"))
     val akkaFrameSize = maxFrameSizeBytes(conf)
     val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
     val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
@@ -78,8 +78,8 @@ private[spark] object AkkaUtils extends Logging {
 
     val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
 
-    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
-    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
+    val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
+    val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
 
     val secretKey = securityManager.getSecretKey()
     val isAuthOn = securityManager.isAuthenticationEnabled()
@@ -102,14 +102,14 @@ private[spark] object AkkaUtils extends Logging {
       |akka.jvm-exit-on-fatal-error = off
       |akka.remote.require-cookie = "$requireCookie"
       |akka.remote.secure-cookie = "$secureCookie"
-      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
-      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
+      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = $port
       |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+      |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
       |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
       |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
       |akka.actor.default-dispatcher.throughput = $akkaBatchSize

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index 375ed43..2bbfc98 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration {
 // initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
 private[spark] object MetadataCleaner {
   def getDelaySeconds(conf: SparkConf): Int = {
-    conf.getInt("spark.cleaner.ttl", -1)
+    conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt
   }
 
   def getDelaySeconds(

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a541d66..1029b0f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
 import java.util.{Properties, Locale, Random, UUID}
-import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
+import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
 import scala.collection.JavaConversions._
@@ -47,6 +47,7 @@ import tachyon.client.{TachyonFS, TachyonFile}
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
 /** CallSite represents a place in user code. It can have a short and a long form. */
@@ -612,9 +613,10 @@ private[spark] object Utils extends Logging {
         }
         Utils.setupSecureURLConnection(uc, securityMgr)
 
-        val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
-        uc.setConnectTimeout(timeout)
-        uc.setReadTimeout(timeout)
+        val timeoutMs = 
+          conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
+        uc.setConnectTimeout(timeoutMs)
+        uc.setReadTimeout(timeoutMs)
         uc.connect()
         val in = uc.getInputStream()
         downloadFile(url, in, targetFile, fileOverwrite)
@@ -1019,6 +1021,22 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+   * no suffix is provided, the passed number is assumed to be in ms.
+   */
+  def timeStringAsMs(str: String): Long = {
+    JavaUtils.timeStringAsMs(str)
+  }
+
+  /**
+   * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
+   * no suffix is provided, the passed number is assumed to be in seconds.
+   */
+  def timeStringAsSeconds(str: String): Long = {
+    JavaUtils.timeStringAsSec(str)
+  }
+
+  /**
    * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
    */
   def memoryStringToMb(str: String): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4..6b3049b 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -684,10 +684,11 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
       .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
-      .set("spark.dynamicAllocation.schedulerBacklogTimeout", schedulerBacklogTimeout.toString)
+      .set("spark.dynamicAllocation.schedulerBacklogTimeout",
+          s"${schedulerBacklogTimeout.toString}s")
       .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
-        sustainedSchedulerBacklogTimeout.toString)
-      .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
+        s"${sustainedSchedulerBacklogTimeout.toString}s")
+      .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
       .set("spark.dynamicAllocation.testing", "true")
     val sc = new SparkContext(conf)
     contexts += sc

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 716f875..02424c5 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -260,8 +260,8 @@ class ConnectionManagerSuite extends FunSuite {
   test("sendMessageReliably timeout") {
     val clientConf = new SparkConf
     clientConf.set("spark.authenticate", "false")
-    val ackTimeout = 30
-    clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+    val ackTimeoutS = 30
+    clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
 
     val clientSecurityManager = new SecurityManager(clientConf)
     val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
@@ -272,7 +272,7 @@ class ConnectionManagerSuite extends FunSuite {
     val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
     managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       // sleep 60 sec > ack timeout for simulating server slow down or hang up
-      Thread.sleep(ackTimeout * 3 * 1000)
+      Thread.sleep(ackTimeoutS * 3 * 1000)
       None
     })
 
@@ -287,7 +287,7 @@ class ConnectionManagerSuite extends FunSuite {
     // Otherwise TimeoutExcepton is thrown from Await.result.
     // We expect TimeoutException is not thrown.
     intercept[IOException] {
-      Await.result(future, (ackTimeout * 2) second)
+      Await.result(future, (ackTimeoutS * 2) second)
     }
 
     manager.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 716d12c..6198cea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
 import java.util.Random
 
 import scala.collection.mutable.ArrayBuffer
@@ -27,7 +26,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, Utils}
 
 class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
   extends DAGScheduler(sc) {
@@ -152,7 +151,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   private val conf = new SparkConf
 
-  val LOCALITY_WAIT = conf.getLong("spark.locality.wait", 3000)
+  val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
   val MAX_TASK_FAILURES = 4
 
   override def beforeEach() {
@@ -240,7 +239,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
     assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
 
-    clock.advance(LOCALITY_WAIT)
+    clock.advance(LOCALITY_WAIT_MS)
     // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
     // get chosen before the noPref task
     assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
@@ -251,7 +250,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
     // after failing to find a node_Local task
     assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
-    clock.advance(LOCALITY_WAIT)
+    clock.advance(LOCALITY_WAIT_MS)
     assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
   }
 
@@ -292,7 +291,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // Offer host1 again: nothing should get chosen
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
 
-    clock.advance(LOCALITY_WAIT)
+    clock.advance(LOCALITY_WAIT_MS)
 
     // Offer host1 again: second task (on host2) should get chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -306,7 +305,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // Now that we've launched a local task, we should no longer launch the task for host3
     assert(manager.resourceOffer("exec2", "host2", ANY) === None)
 
-    clock.advance(LOCALITY_WAIT)
+    clock.advance(LOCALITY_WAIT_MS)
 
     // After another delay, we can go ahead and launch that task non-locally
     assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
@@ -338,7 +337,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // nothing should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
 
-    clock.advance(LOCALITY_WAIT * 2)
+    clock.advance(LOCALITY_WAIT_MS * 2)
 
     // task 1 and 2 would be scheduled as nonLocal task
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
@@ -528,7 +527,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
     // Set allowed locality to ANY
-    clock.advance(LOCALITY_WAIT * 3)
+    clock.advance(LOCALITY_WAIT_MS * 3)
     // Offer host3
     // No task is scheduled if we restrict locality to RACK_LOCAL
     assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
@@ -622,12 +621,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
 
     manager.speculatableTasks += 1
-    clock.advance(LOCALITY_WAIT)
+    clock.advance(LOCALITY_WAIT_MS)
     // schedule the nonPref task
     assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
     // schedule the speculative task
     assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
-    clock.advance(LOCALITY_WAIT * 3)
+    clock.advance(LOCALITY_WAIT_MS * 3)
     // schedule non-local tasks
     assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
   }
@@ -716,13 +715,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
     assert(manager.resourceOffer("execA", "host1", ANY) !== None)
-    clock.advance(LOCALITY_WAIT * 4)
+    clock.advance(LOCALITY_WAIT_MS * 4)
     assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
     sched.removeExecutor("execA")
     sched.removeExecutor("execB.2")
     manager.executorLost("execA", "host1")
     manager.executorLost("execB.2", "host2")
-    clock.advance(LOCALITY_WAIT * 4)
+    clock.advance(LOCALITY_WAIT_MS * 4)
     sched.addExecutor("execC", "host3")
     manager.executorAdded()
     // Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b4de90b..ffa5162 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -76,7 +76,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
     conf.set("spark.storage.unrollMemoryThreshold", "512")
 
     // to make a replication attempt to inactive store fail fast
-    conf.set("spark.core.connection.ack.wait.timeout", "1")
+    conf.set("spark.core.connection.ack.wait.timeout", "1s")
     // to make cached peers refresh frequently
     conf.set("spark.storage.cachedPeersTtl", "10")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 449fb87..fb97e65 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -23,6 +23,7 @@ import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStr
 import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
 import java.text.DecimalFormatSymbols
+import java.util.concurrent.TimeUnit
 import java.util.Locale
 
 import com.google.common.base.Charsets.UTF_8
@@ -35,7 +36,50 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 
 class UtilsSuite extends FunSuite with ResetSystemProperties {
+  
+  test("timeConversion") {
+    // Test -1
+    assert(Utils.timeStringAsSeconds("-1") === -1)
+    
+    // Test zero
+    assert(Utils.timeStringAsSeconds("0") === 0)
+    
+    assert(Utils.timeStringAsSeconds("1") === 1)
+    assert(Utils.timeStringAsSeconds("1s") === 1)
+    assert(Utils.timeStringAsSeconds("1000ms") === 1)
+    assert(Utils.timeStringAsSeconds("1000000us") === 1)
+    assert(Utils.timeStringAsSeconds("1m") === TimeUnit.MINUTES.toSeconds(1))
+    assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
+    assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
+    assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
+    
+    assert(Utils.timeStringAsMs("1") === 1)
+    assert(Utils.timeStringAsMs("1ms") === 1)
+    assert(Utils.timeStringAsMs("1000us") === 1)
+    assert(Utils.timeStringAsMs("1s") === TimeUnit.SECONDS.toMillis(1))
+    assert(Utils.timeStringAsMs("1m") === TimeUnit.MINUTES.toMillis(1))
+    assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
+    assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
+    assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
+    
+    // Test invalid strings
+    intercept[NumberFormatException] {
+      Utils.timeStringAsMs("This breaks 600s")
+    }
+
+    intercept[NumberFormatException] {
+      Utils.timeStringAsMs("This breaks 600ds")
+    }
 
+    intercept[NumberFormatException] {
+      Utils.timeStringAsMs("600s This breaks")
+    }
+
+    intercept[NumberFormatException] {
+      Utils.timeStringAsMs("This 123s breaks")
+    }
+  }
+  
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")
     assert(Utils.bytesToString(1500) === "1500.0 B")

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7fe1147..7169ec2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -35,9 +35,19 @@ val conf = new SparkConf()
 val sc = new SparkContext(conf)
 {% endhighlight %}
 
-Note that we can have more than 1 thread in local mode, and in cases like spark streaming, we may actually
-require one to prevent any sort of starvation issues.
+Note that we can have more than 1 thread in local mode, and in cases like Spark Streaming, we may 
+actually require one to prevent any sort of starvation issues.
 
+Properties that specify some time duration should be configured with a unit of time. 
+The following format is accepted:
+ 
+    25ms (milliseconds)
+    5s (seconds)
+    10m or 10min (minutes)
+    3h (hours)
+    5d (days)
+    1y (years)
+    
 ## Dynamically Loading Spark Properties
 In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
 instance, if you'd like to run the same application with different masters or different
@@ -429,10 +439,10 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.shuffle.io.retryWait</code></td>
-  <td>5</td>
+  <td>5s</td>
   <td>
-    (Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
-    is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
+    (Netty only) How long to wait between retries of fetches. The maximum delay caused by retrying
+    is 15 seconds by default, calculated as <code>maxRetries * retryWait</code>.
   </td>
 </tr>
 <tr>
@@ -732,17 +742,17 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
     <td><code>spark.executor.heartbeatInterval</code></td>
-    <td>10000</td>
-    <td>Interval (milliseconds) between each executor's heartbeats to the driver.  Heartbeats let
+    <td>10s</td>
+    <td>Interval between each executor's heartbeats to the driver.  Heartbeats let
     the driver know that the executor is still alive and update it with metrics for in-progress
     tasks.</td>
 </tr>
 <tr>
   <td><code>spark.files.fetchTimeout</code></td>
-  <td>60</td>
+  <td>60s</td>
   <td>
     Communication timeout to use when fetching files added through SparkContext.addFile() from
-    the driver, in seconds.
+    the driver.
   </td>
 </tr>
 <tr>
@@ -853,11 +863,11 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.akka.heartbeat.interval</code></td>
-  <td>1000</td>
+  <td>1000s</td>
   <td>
     This is set to a larger value to disable the transport failure detector that comes built in to 
     Akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger 
-    interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more 
+    interval value reduces network overhead and a smaller value ( ~ 1 s) might be more 
     informative for Akka's failure detector. Tune this in combination of `spark.akka.heartbeat.pauses` 
     if you need to. A likely positive use case for using failure detector would be: a sensistive 
     failure detector can help evict rogue executors quickly. However this is usually not the case 
@@ -868,11 +878,11 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.akka.heartbeat.pauses</code></td>
-  <td>6000</td>
+  <td>6000s</td>
   <td>
      This is set to a larger value to disable the transport failure detector that comes built in to Akka.
      It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart 
-     beat pause in seconds for Akka. This can be used to control sensitivity to GC pauses. Tune
+     beat pause for Akka. This can be used to control sensitivity to GC pauses. Tune
      this along with `spark.akka.heartbeat.interval` if you need to.
   </td>
 </tr>
@@ -886,9 +896,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.akka.timeout</code></td>
-  <td>100</td>
+  <td>100s</td>
   <td>
-    Communication timeout between Spark nodes, in seconds.
+    Communication timeout between Spark nodes.
   </td>
 </tr>
 <tr>
@@ -938,10 +948,10 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.network.timeout</code></td>
-  <td>120</td>
+  <td>120s</td>
   <td>
-    Default timeout for all network interactions, in seconds. This config will be used in
-    place of <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+    Default timeout for all network interactions. This config will be used in place of 
+    <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
     <code>spark.storage.blockManagerSlaveTimeoutMs</code> or
     <code>spark.shuffle.io.connectionTimeout</code>, if they are not configured.
   </td>
@@ -989,9 +999,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.locality.wait</code></td>
-  <td>3000</td>
+  <td>3s</td>
   <td>
-    Number of milliseconds to wait to launch a data-local task before giving up and launching it
+    How long to wait to launch a data-local task before giving up and launching it
     on a less-local node. The same wait will be used to step through multiple locality levels
     (process-local, node-local, rack-local and then any). It is also possible to customize the
     waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
@@ -1024,10 +1034,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
-  <td>30000</td>
+  <td>30s</td>
   <td>
-    Maximum amount of time to wait for resources to register before scheduling begins
-    (in milliseconds).
+    Maximum amount of time to wait for resources to register before scheduling begins.
   </td>
 </tr>
 <tr>
@@ -1054,10 +1063,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.scheduler.revive.interval</code></td>
-  <td>1000</td>
+  <td>1s</td>
   <td>
-    The interval length for the scheduler to revive the worker resource offers to run tasks
-    (in milliseconds).
+    The interval length for the scheduler to revive the worker resource offers to run tasks.
   </td>
 </tr>
 <tr>
@@ -1070,9 +1078,9 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.speculation.interval</code></td>
-  <td>100</td>
+  <td>100ms</td>
   <td>
-    How often Spark will check for tasks to speculate, in milliseconds.
+    How often Spark will check for tasks to speculate.
   </td>
 </tr>
 <tr>
@@ -1127,10 +1135,10 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
-  <td>600</td>
+  <td>600s</td>
   <td>
-    If dynamic allocation is enabled and an executor has been idle for more than this duration
-    (in seconds), the executor will be removed. For more detail, see this
+    If dynamic allocation is enabled and an executor has been idle for more than this duration, 
+    the executor will be removed. For more detail, see this
     <a href="job-scheduling.html#resource-allocation-policy">description</a>.
   </td>
 </tr>
@@ -1157,10 +1165,10 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
-  <td>5</td>
+  <td>5s</td>
   <td>
     If dynamic allocation is enabled and there have been pending tasks backlogged for more than
-    this duration (in seconds), new executors will be requested. For more detail, see this
+    this duration, new executors will be requested. For more detail, see this
     <a href="job-scheduling.html#resource-allocation-policy">description</a>.
   </td>
 </tr>
@@ -1215,18 +1223,18 @@ Apart from these, the following properties are also available, and may be useful
 </tr>
 <tr>
   <td><code>spark.core.connection.ack.wait.timeout</code></td>
-  <td>60</td>
+  <td>60s</td>
   <td>
-    Number of seconds for the connection to wait for ack to occur before timing
+    How long for the connection to wait for ack to occur before timing
     out and giving up. To avoid unwilling timeout caused by long pause like GC,
     you can set larger value.
   </td>
 </tr>
 <tr>
   <td><code>spark.core.connection.auth.wait.timeout</code></td>
-  <td>30</td>
+  <td>30s</td>
   <td>
-    Number of seconds for the connection to wait for authentication to occur before timing
+    How long for the connection to wait for authentication to occur before timing
     out and giving up.
   </td>
 </tr>
@@ -1347,9 +1355,9 @@ Apart from these, the following properties are also available, and may be useful
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
   <td><code>spark.streaming.blockInterval</code></td>
-  <td>200</td>
+  <td>200ms</td>
   <td>
-    Interval (milliseconds) at which data received by Spark Streaming receivers is chunked
+    Interval at which data received by Spark Streaming receivers is chunked
     into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the
     <a href="streaming-programming-guide.html#level-of-parallelism-in-data-receiving">performance
      tuning</a> section in the Spark Streaming programing guide for more details.

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ed5bb26..853c9f2 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -48,9 +48,9 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
 </tr>
 <tr>
   <td><code>spark.yarn.am.waitTime</code></td>
-  <td>100000</td>
+  <td>100s</td>
   <td>
-    In yarn-cluster mode, time in milliseconds for the application master to wait for the
+    In yarn-cluster mode, time for the application master to wait for the
     SparkContext to be initialized. In yarn-client mode, time for the application master to wait
     for the driver to connect to it.
   </td>

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 73da9b7..b6fbace 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -21,9 +21,13 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -121,4 +125,66 @@ public class JavaUtils {
     }
     return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
   }
+
+  private static ImmutableMap<String, TimeUnit> timeSuffixes = 
+    ImmutableMap.<String, TimeUnit>builder()
+      .put("us", TimeUnit.MICROSECONDS)
+      .put("ms", TimeUnit.MILLISECONDS)
+      .put("s", TimeUnit.SECONDS)
+      .put("m", TimeUnit.MINUTES)
+      .put("min", TimeUnit.MINUTES)
+      .put("h", TimeUnit.HOURS)
+      .put("d", TimeUnit.DAYS)
+      .build();
+
+  /**
+   * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
+   * internal use. If no suffix is provided a direct conversion is attempted.
+   */
+  private static long parseTimeString(String str, TimeUnit unit) {
+    String lower = str.toLowerCase().trim();
+    
+    try {
+      String suffix;
+      long val;
+      Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
+      if (m.matches()) {
+        val = Long.parseLong(m.group(1));
+        suffix = m.group(2);
+      } else {
+        throw new NumberFormatException("Failed to parse time string: " + str);
+      }
+      
+      // Check for invalid suffixes
+      if (suffix != null && !timeSuffixes.containsKey(suffix)) {
+        throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+      }
+      
+      // If suffix is valid use that, otherwise none was provided and use the default passed
+      return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
+    } catch (NumberFormatException e) {
+      String timeError = "Time must be specified as seconds (s), " +
+              "milliseconds (ms), microseconds (us), minutes (m or min) hour (h), or day (d). " +
+              "E.g. 50s, 100ms, or 250us.";
+      
+      throw new NumberFormatException(timeError + "\n" + e.getMessage());
+    }
+  }
+  
+  /**
+   * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
+   * no suffix is provided, the passed number is assumed to be in ms.
+   */
+  public static long timeStringAsMs(String str) {
+    return parseTimeString(str, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
+   * no suffix is provided, the passed number is assumed to be in seconds.
+   */
+  public static long timeStringAsSec(String str) {
+    return parseTimeString(str, TimeUnit.SECONDS);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 2eaf3b7..0aef7f1 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -37,8 +37,11 @@ public class TransportConf {
 
   /** Connect timeout in milliseconds. Default 120 secs. */
   public int connectionTimeoutMs() {
-    int defaultTimeout = conf.getInt("spark.network.timeout", 120);
-    return conf.getInt("spark.shuffle.io.connectionTimeout", defaultTimeout) * 1000;
+    long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
+      conf.get("spark.network.timeout", "120s"));
+    long defaultTimeoutMs = JavaUtils.timeStringAsSec(
+      conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+    return (int) defaultTimeoutMs;
   }
 
   /** Number of concurrent connections between two nodes for fetching data. */
@@ -68,7 +71,9 @@ public class TransportConf {
   public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
 
   /** Timeout for a single round trip of SASL token exchange, in milliseconds. */
-  public int saslRTTimeoutMs() { return conf.getInt("spark.shuffle.sasl.timeout", 30) * 1000; }
+  public int saslRTTimeoutMs() {
+    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
+  }
 
   /**
    * Max number of times we will try IO exceptions (such as connection timeouts) per request.
@@ -80,7 +85,9 @@ public class TransportConf {
    * Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
    * Only relevant if maxIORetries &gt; 0.
    */
-  public int ioRetryWaitTimeMs() { return conf.getInt("spark.shuffle.io.retryWait", 5) * 1000; }
+  public int ioRetryWaitTimeMs() {
+    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
+  }
 
   /**
    * Minimum size of a block that we should start using memory map rather than reading in through

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 42514d8..f4963a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.SystemClock
+import org.apache.spark.util.{SystemClock, Utils}
 
 /** Listener object for BlockGenerator events */
 private[streaming] trait BlockGeneratorListener {
@@ -79,9 +79,9 @@ private[streaming] class BlockGenerator(
   private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
 
   private val clock = new SystemClock()
-  private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
+  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
   private val blockIntervalTimer =
-    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
+    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
   private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
   private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
   private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@@ -132,7 +132,7 @@ private[streaming] class BlockGenerator(
       val newBlockBuffer = currentBuffer
       currentBuffer = new ArrayBuffer[Any]
       if (newBlockBuffer.size > 0) {
-        val blockId = StreamBlockId(receiverId, time - blockInterval)
+        val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
         val newBlock = new Block(blockId, newBlockBuffer)
         listener.onGenerateBlock(blockId)
         blocksForPushing.put(newBlock)  // put is blocking when queue is full

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 4946806..58e5663 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor}
 import org.apache.spark.{SparkEnv, Logging}
 import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
 import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, ManualClock}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
 
 /** Event classes for JobGenerator */
 private[scheduler] sealed trait JobGeneratorEvent
@@ -104,17 +104,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     if (processReceivedData) {
       logInfo("Stopping JobGenerator gracefully")
       val timeWhenStopStarted = System.currentTimeMillis()
-      val stopTimeout = conf.getLong(
-        "spark.streaming.gracefulStopTimeout",
-        10 * ssc.graph.batchDuration.milliseconds
-      )
+      val stopTimeoutMs = conf.getTimeAsMs(
+        "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms")
       val pollTime = 100
 
       // To prevent graceful stop to get stuck permanently
       def hasTimedOut: Boolean = {
-        val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeout
+        val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeoutMs
         if (timedOut) {
-          logWarning("Timed out while stopping the job generator (timeout = " + stopTimeout + ")")
+          logWarning("Timed out while stopping the job generator (timeout = " + stopTimeoutMs + ")")
         }
         timedOut
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 10c35cb..91261a9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -131,11 +131,11 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
 
   test("block generator") {
     val blockGeneratorListener = new FakeBlockGeneratorListener
-    val blockInterval = 200
-    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+    val blockIntervalMs = 200
+    val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms")
     val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
     val expectedBlocks = 5
-    val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+    val waitTime = expectedBlocks * blockIntervalMs + (blockIntervalMs / 2)
     val generatedData = new ArrayBuffer[Int]
 
     // Generate blocks
@@ -157,15 +157,15 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
 
   test("block generator throttling") {
     val blockGeneratorListener = new FakeBlockGeneratorListener
-    val blockInterval = 100
+    val blockIntervalMs = 100
     val maxRate = 100
-    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+    val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
       set("spark.streaming.receiver.maxRate", maxRate.toString)
     val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
     val expectedBlocks = 20
-    val waitTime = expectedBlocks * blockInterval
+    val waitTime = expectedBlocks * blockIntervalMs
     val expectedMessages = maxRate * waitTime / 1000
-    val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+    val expectedMessagesPerBlock = maxRate * blockIntervalMs / 1000
     val generatedData = new ArrayBuffer[Int]
 
     // Generate blocks

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d1bbf39..58353a5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -73,9 +73,9 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("from conf with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10")
+    myConf.set("spark.cleaner.ttl", "10s")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
   }
 
   test("from existing SparkContext") {
@@ -85,24 +85,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("from existing SparkContext with settings") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10")
+    myConf.set("spark.cleaner.ttl", "10s")
     ssc = new StreamingContext(myConf, batchDuration)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
   }
 
   test("from checkpoint") {
     val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName)
-    myConf.set("spark.cleaner.ttl", "10")
+    myConf.set("spark.cleaner.ttl", "10s")
     val ssc1 = new StreamingContext(myConf, batchDuration)
     addInputStream(ssc1).register()
     ssc1.start()
     val cp = new Checkpoint(ssc1, Time(1000))
-    assert(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1") === "10")
+    assert(
+      Utils.timeStringAsSeconds(cp.sparkConfPairs
+          .toMap.getOrElse("spark.cleaner.ttl", "-1")) === 10)
     ssc1.stop()
     val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
-    assert(newCp.createSparkConf().getInt("spark.cleaner.ttl", -1) === 10)
+    assert(newCp.createSparkConf().getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
     ssc = new StreamingContext(null, newCp, null)
-    assert(ssc.conf.getInt("spark.cleaner.ttl", -1) === 10)
+    assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)
   }
 
   test("start and stop state check") {
@@ -176,7 +178,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("stop gracefully") {
     val conf = new SparkConf().setMaster(master).setAppName(appName)
-    conf.set("spark.cleaner.ttl", "3600")
+    conf.set("spark.cleaner.ttl", "3600s")
     sc = new SparkContext(conf)
     for (i <- 1 to 4) {
       logInfo("==================================\n\n\n")
@@ -207,7 +209,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
 
   test("stop slow receiver gracefully") {
     val conf = new SparkConf().setMaster(master).setAppName(appName)
-    conf.set("spark.streaming.gracefulStopTimeout", "20000")
+    conf.set("spark.streaming.gracefulStopTimeout", "20000s")
     sc = new SparkContext(conf)
     logInfo("==================================\n\n\n")
     ssc = new StreamingContext(sc, Milliseconds(100))

http://git-wip-us.apache.org/repos/asf/spark/blob/c4ab255e/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 26259ce..c357b7a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -296,7 +296,7 @@ private[spark] class ApplicationMaster(
 
     // we want to be reasonably responsive without causing too many requests to RM.
     val schedulerInterval =
-      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
 
     // must be <= expiryInterval / 2.
     val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
@@ -379,7 +379,8 @@ private[spark] class ApplicationMaster(
         logWarning(
           "spark.yarn.applicationMaster.waitTries is deprecated, use spark.yarn.am.waitTime")
       }
-      val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", waitTries.getOrElse(100000L))
+      val totalWaitTime = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", 
+        s"${waitTries.getOrElse(100000L)}ms")
       val deadline = System.currentTimeMillis() + totalWaitTime
 
       while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
@@ -404,8 +405,8 @@ private[spark] class ApplicationMaster(
 
     // Spark driver should already be up since it launched us, but we don't want to
     // wait forever, so wait 100 seconds max to match the cluster mode setting.
-    val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L)
-    val deadline = System.currentTimeMillis + totalWaitTime
+    val totalWaitTimeMs = sparkConf.getTimeAsMs("spark.yarn.am.waitTime", "100s")
+    val deadline = System.currentTimeMillis + totalWaitTimeMs
 
     while (!driverUp && !finished && System.currentTimeMillis < deadline) {
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org