You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/13 19:12:38 UTC

[spark] branch master updated: [SPARK-26817][CORE] Use System.nanoTime to measure time intervals

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a829234  [SPARK-26817][CORE] Use System.nanoTime to measure time intervals
a829234 is described below

commit a829234df35c87c169425f2c79fd1963b5420888
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Wed Feb 13 13:12:16 2019 -0600

    [SPARK-26817][CORE] Use System.nanoTime to measure time intervals
    
    ## What changes were proposed in this pull request?
    
    In the PR, I propose to use `System.nanoTime()` instead of `System.currentTimeMillis()` in measurements of time intervals.
    
    `System.currentTimeMillis()` returns current wallclock time and will follow changes to the system clock. Thus, negative wallclock adjustments can cause timeouts to "hang" for a long time (until wallclock time has caught up to its previous value again). This can happen when ntpd does a "step" after the network has been disconnected for some time. The most canonical example is during system bootup when DHCP takes longer than usual. This can lead to failures that are really hard to under [...]
    
    ## How was this patch tested?
    
    By existing test suites.
    
    Closes #23727 from MaxGekk/system-nanotime.
    
    Lead-authored-by: Maxim Gekk <ma...@gmail.com>
    Co-authored-by: Maxim Gekk <ma...@databricks.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala  |  6 +--
 .../spark/api/python/PythonWorkerFactory.scala     | 12 +++---
 .../apache/spark/broadcast/TorrentBroadcast.scala  |  4 +-
 .../scala/org/apache/spark/executor/Executor.scala | 45 +++++++++++++---------
 .../org/apache/spark/scheduler/ResultTask.scala    |  4 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala    |  4 +-
 .../scala/org/apache/spark/scheduler/Task.scala    |  4 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 10 ++---
 .../apache/spark/security/CryptoStreamUtils.scala  |  7 ++--
 .../org/apache/spark/storage/BlockManager.scala    | 35 ++++++++---------
 .../scala/org/apache/spark/storage/DiskStore.scala | 11 ++----
 .../storage/ShuffleBlockFetcherIterator.scala      | 16 ++++----
 .../main/scala/org/apache/spark/util/Utils.scala   | 17 ++++----
 .../org/apache/spark/ContextCleanerSuite.scala     |  5 ++-
 .../org/apache/spark/JobCancellationSuite.scala    |  6 +--
 .../scala/org/apache/spark/SparkConfSuite.scala    |  4 +-
 .../scala/org/apache/spark/ThreadingSuite.scala    |  7 ++--
 .../scala/org/apache/spark/util/UtilsSuite.scala   | 15 ++++----
 .../spark/util/collection/SizeTrackerSuite.scala   |  6 ++-
 .../apache/spark/util/collection/SorterSuite.scala |  6 ++-
 .../scala/org/apache/spark/examples/HdfsTest.scala |  8 ++--
 .../kafka010/KafkaDontFailOnDataLossSuite.scala    |  7 ++--
 .../spark/streaming/kafka010/KafkaTestUtils.scala  |  8 ++--
 .../streaming/kinesis/KinesisBackedBlockRDD.scala  |  9 ++++-
 .../spark/streaming/kinesis/KinesisTestUtils.scala |  5 +--
 .../org/apache/spark/graphx/GraphLoader.scala      |  7 +++-
 .../org/apache/spark/ml/util/stopwatches.scala     |  4 +-
 .../org/apache/spark/ml/util/StopwatchSuite.scala  |  3 +-
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 11 +++---
 .../spark/deploy/yarn/ApplicationMaster.scala      |  8 ++--
 .../spark/deploy/yarn/BaseYarnClusterSuite.scala   |  4 +-
 .../sql/streaming/StreamingQueryManager.scala      |  7 +++-
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala  | 13 ++++---
 .../org/apache/spark/streaming/Checkpoint.scala    | 11 +++---
 .../spark/streaming/scheduler/JobGenerator.scala   |  7 +++-
 .../spark/streaming/JavaReceiverAPISuite.java      |  7 ++--
 .../apache/spark/streaming/InputStreamsSuite.scala |  4 +-
 .../apache/spark/streaming/MasterFailureTest.scala |  9 +++--
 .../org/apache/spark/streaming/ReceiverSuite.scala |  6 +--
 .../org/apache/spark/streaming/TestSuiteBase.scala |  8 ++--
 40 files changed, 202 insertions(+), 168 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 803703f..1d4b1ef 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import java.io._
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.JavaConverters._
@@ -706,7 +706,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
     val statuses = mapStatuses.get(shuffleId).orNull
     if (statuses == null) {
       logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
-      val startTime = System.currentTimeMillis
+      val startTimeNs = System.nanoTime()
       var fetchedStatuses: Array[MapStatus] = null
       fetching.synchronized {
         // Someone else is fetching it; wait for them to be done
@@ -744,7 +744,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
         }
       }
       logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
-        s"${System.currentTimeMillis - startTime} ms")
+        s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
 
       if (fetchedStatuses != null) {
         fetchedStatuses
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 09e219f..97aab74 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
 
 import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter}
 import java.net.{InetAddress, ServerSocket, Socket, SocketException}
-import java.nio.charset.StandardCharsets
 import java.util.Arrays
+import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
@@ -84,7 +84,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
   @GuardedBy("self")
   private val idleWorkers = new mutable.Queue[Socket]()
   @GuardedBy("self")
-  private var lastActivity = 0L
+  private var lastActivityNs = 0L
   new MonitorThread().start()
 
   @GuardedBy("self")
@@ -291,9 +291,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
     override def run() {
       while (true) {
         self.synchronized {
-          if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System.currentTimeMillis()) {
+          if (IDLE_WORKER_TIMEOUT_NS < System.nanoTime() - lastActivityNs) {
             cleanupIdleWorkers()
-            lastActivity = System.currentTimeMillis()
+            lastActivityNs = System.nanoTime()
           }
         }
         Thread.sleep(10000)
@@ -358,7 +358,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
   def releaseWorker(worker: Socket) {
     if (useDaemon) {
       self.synchronized {
-        lastActivity = System.currentTimeMillis()
+        lastActivityNs = System.nanoTime()
         idleWorkers.enqueue(worker)
       }
     } else {
@@ -375,5 +375,5 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
 private object PythonWorkerFactory {
   val PROCESS_WAIT_TIMEOUT_MS = 10000
-  val IDLE_WORKER_TIMEOUT_MS = 60000  // kill idle workers after 1 minute
+  val IDLE_WORKER_TIMEOUT_NS = TimeUnit.MINUTES.toNanos(1)  // kill idle workers after 1 minute
 }
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 7680587..f416be8 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -239,9 +239,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
             val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
             logInfo(s"Started reading broadcast variable $id with $numBlocks pieces " +
               s"(estimated total size $estimatedTotalSize)")
-            val startTimeMs = System.currentTimeMillis()
+            val startTimeNs = System.nanoTime()
             val blocks = readBlocks()
-            logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
+            logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")
 
             try {
               val obj = TorrentBroadcast.unBlockifyObject[T](
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fccb1ac..6b23b26 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -348,10 +348,11 @@ private[spark] class Executor(
      *    2. Collect accumulator updates
      *    3. Set the finished flag to true and clear current thread's interrupt status
      */
-    private def collectAccumulatorsAndResetStatusOnFailure(taskStartTime: Long) = {
+    private def collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs: Long) = {
       // Report executor runtime and JVM gc time
       Option(task).foreach(t => {
-        t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStartTime)
+        t.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
+          System.nanoTime() - taskStartTimeNs))
         t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
       })
 
@@ -369,7 +370,7 @@ private[spark] class Executor(
       Thread.currentThread.setName(threadName)
       val threadMXBean = ManagementFactory.getThreadMXBean
       val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
-      val deserializeStartTime = System.currentTimeMillis()
+      val deserializeStartTimeNs = System.nanoTime()
       val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
         threadMXBean.getCurrentThreadCpuTime
       } else 0L
@@ -377,7 +378,7 @@ private[spark] class Executor(
       val ser = env.closureSerializer.newInstance()
       logInfo(s"Running $taskName (TID $taskId)")
       execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
-      var taskStartTime: Long = 0
+      var taskStartTimeNs: Long = 0
       var taskStartCpu: Long = 0
       startGCTime = computeTotalGcTime()
 
@@ -413,7 +414,7 @@ private[spark] class Executor(
         }
 
         // Run the actual task and measure its runtime.
-        taskStartTime = System.currentTimeMillis()
+        taskStartTimeNs = System.nanoTime()
         taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
           threadMXBean.getCurrentThreadCpuTime
         } else 0L
@@ -457,7 +458,7 @@ private[spark] class Executor(
             s"unrecoverable fetch failures!  Most likely this means user code is incorrectly " +
             s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure)
         }
-        val taskFinish = System.currentTimeMillis()
+        val taskFinishNs = System.nanoTime()
         val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
           threadMXBean.getCurrentThreadCpuTime
         } else 0L
@@ -466,22 +467,24 @@ private[spark] class Executor(
         task.context.killTaskIfInterrupted()
 
         val resultSer = env.serializer.newInstance()
-        val beforeSerialization = System.currentTimeMillis()
+        val beforeSerializationNs = System.nanoTime()
         val valueBytes = resultSer.serialize(value)
-        val afterSerialization = System.currentTimeMillis()
+        val afterSerializationNs = System.nanoTime()
 
         // Deserialization happens in two parts: first, we deserialize a Task object, which
         // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
-        task.metrics.setExecutorDeserializeTime(
-          (taskStartTime - deserializeStartTime) + task.executorDeserializeTime)
+        task.metrics.setExecutorDeserializeTime(TimeUnit.NANOSECONDS.toMillis(
+          (taskStartTimeNs - deserializeStartTimeNs) + task.executorDeserializeTimeNs))
         task.metrics.setExecutorDeserializeCpuTime(
           (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
         // We need to subtract Task.run()'s deserialization time to avoid double-counting
-        task.metrics.setExecutorRunTime((taskFinish - taskStartTime) - task.executorDeserializeTime)
+        task.metrics.setExecutorRunTime(TimeUnit.NANOSECONDS.toMillis(
+          (taskFinishNs - taskStartTimeNs) - task.executorDeserializeTimeNs))
         task.metrics.setExecutorCpuTime(
           (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
         task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
-        task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
+        task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
+          afterSerializationNs - beforeSerializationNs))
 
         // Expose task metrics using the Dropwizard metrics system.
         // Update task metrics counters
@@ -560,7 +563,7 @@ private[spark] class Executor(
         case t: TaskKilledException =>
           logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}")
 
-          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
+          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
           val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums))
           execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
 
@@ -569,7 +572,7 @@ private[spark] class Executor(
           val killReason = task.reasonIfKilled.getOrElse("unknown reason")
           logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
 
-          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
+          val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
           val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums))
           execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
 
@@ -604,7 +607,7 @@ private[spark] class Executor(
           // the task failure would not be ignored if the shutdown happened because of premption,
           // instead of an app issue).
           if (!ShutdownHookManager.inShutdown()) {
-            val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime)
+            val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
 
             val serializedTaskEndReason = {
               try {
@@ -669,14 +672,16 @@ private[spark] class Executor(
 
     private[this] val killPollingIntervalMs: Long = conf.get(TASK_REAPER_POLLING_INTERVAL)
 
-    private[this] val killTimeoutMs: Long = conf.get(TASK_REAPER_KILL_TIMEOUT)
+    private[this] val killTimeoutNs: Long = {
+      TimeUnit.MILLISECONDS.toNanos(conf.get(TASK_REAPER_KILL_TIMEOUT))
+    }
 
     private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)
 
     override def run(): Unit = {
-      val startTimeMs = System.currentTimeMillis()
-      def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
-      def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs
+      val startTimeNs = System.nanoTime()
+      def elapsedTimeNs = System.nanoTime() - startTimeNs
+      def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
       try {
         // Only attempt to kill the task once. If interruptThread = false then a second kill
         // attempt would be a no-op and if interruptThread = true then it may not be safe or
@@ -701,6 +706,7 @@ private[spark] class Executor(
           if (taskRunner.isFinished) {
             finished = true
           } else {
+            val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
             logWarning(s"Killed task $taskId is still running after $elapsedTimeMs ms")
             if (takeThreadDump) {
               try {
@@ -718,6 +724,7 @@ private[spark] class Executor(
         }
 
         if (!taskRunner.isFinished && timeoutExceeded()) {
+          val killTimeoutMs = TimeUnit.NANOSECONDS.toMillis(killTimeoutNs)
           if (isLocal) {
             logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " +
               "not killing JVM because we are running in local mode.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index aafeae0..857c89d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -75,14 +75,14 @@ private[spark] class ResultTask[T, U](
   override def runTask(context: TaskContext): U = {
     // Deserialize the RDD and the func using the broadcast variables.
     val threadMXBean = ManagementFactory.getThreadMXBean
-    val deserializeStartTime = System.currentTimeMillis()
+    val deserializeStartTimeNs = System.nanoTime()
     val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime
     } else 0L
     val ser = SparkEnv.get.closureSerializer.newInstance()
     val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
-    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
+    _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
     _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 35664ff..189e35e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -80,14 +80,14 @@ private[spark] class ShuffleMapTask(
   override def runTask(context: TaskContext): MapStatus = {
     // Deserialize the RDD using the broadcast variable.
     val threadMXBean = ManagementFactory.getThreadMXBean
-    val deserializeStartTime = System.currentTimeMillis()
+    val deserializeStartTimeNs = System.nanoTime()
     val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime
     } else 0L
     val ser = SparkEnv.get.closureSerializer.newInstance()
     val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
-    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
+    _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
     _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index eb059f1..c17a903 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -182,7 +182,7 @@ private[spark] abstract class Task[T](
   // context is not yet initialized when kill() is invoked.
   @volatile @transient private var _reasonIfKilled: String = null
 
-  protected var _executorDeserializeTime: Long = 0
+  protected var _executorDeserializeTimeNs: Long = 0
   protected var _executorDeserializeCpuTime: Long = 0
 
   /**
@@ -193,7 +193,7 @@ private[spark] abstract class Task[T](
   /**
    * Returns the amount of time spent deserializing the RDD and function to be run.
    */
-  def executorDeserializeTime: Long = _executorDeserializeTime
+  def executorDeserializeTimeNs: Long = _executorDeserializeTimeNs
   def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index fb28e90..6bba157 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -63,9 +63,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
   // if minRegisteredRatio has not yet been reached
-  private val maxRegisteredWaitingTimeMs =
-    conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)
-  private val createTime = System.currentTimeMillis()
+  private val maxRegisteredWaitingTimeNs = TimeUnit.MILLISECONDS.toNanos(
+    conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME))
+  private val createTimeNs = System.nanoTime()
 
   // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
   // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
@@ -499,9 +499,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
       return true
     }
-    if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) {
+    if ((System.nanoTime() - createTimeNs) >= maxRegisteredWaitingTimeNs) {
       logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
-        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)")
+        s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeNs(ns)")
       return true
     }
     false
diff --git a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
index 18b735b..a4df0d5 100644
--- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
+++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
@@ -20,6 +20,7 @@ import java.io.{Closeable, InputStream, IOException, OutputStream}
 import java.nio.ByteBuffer
 import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 import javax.crypto.KeyGenerator
 import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
 
@@ -133,10 +134,10 @@ private[spark] object CryptoStreamUtils extends Logging {
    */
   private[this] def createInitializationVector(properties: Properties): Array[Byte] = {
     val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-    val initialIVStart = System.currentTimeMillis()
+    val initialIVStart = System.nanoTime()
     CryptoRandomFactory.getCryptoRandom(properties).nextBytes(iv)
-    val initialIVFinish = System.currentTimeMillis()
-    val initialIVTime = initialIVFinish - initialIVStart
+    val initialIVFinish = System.nanoTime()
+    val initialIVTime = TimeUnit.NANOSECONDS.toMillis(initialIVFinish - initialIVStart)
     if (initialIVTime > 2000) {
       logWarning(s"It costs ${initialIVTime} milliseconds to create the Initialization Vector " +
         s"used by CryptoStream")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 497a5f7..79e9ee7 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -22,7 +22,7 @@ import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference}
 import java.nio.ByteBuffer
 import java.nio.channels.Channels
 import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.mutable
 import scala.collection.mutable.HashMap
@@ -210,7 +210,7 @@ private[spark] class BlockManager(
   // Field related to peer block managers that are necessary for block replication
   @volatile private var cachedPeers: Seq[BlockManagerId] = _
   private val peerFetchLock = new Object
-  private var lastPeerFetchTime = 0L
+  private var lastPeerFetchTimeNs = 0L
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
@@ -558,9 +558,9 @@ private[spark] class BlockManager(
    * Get locations of an array of blocks.
    */
   private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = {
-    val startTimeMs = System.currentTimeMillis
+    val startTimeNs = System.nanoTime()
     val locations = master.getLocations(blockIds).toArray
-    logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs)))
+    logDebug(s"Got multiple block location in ${Utils.getUsedTimeNs(startTimeNs)}")
     locations
   }
 
@@ -978,7 +978,7 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       keepReadLock: Boolean = false): Boolean = {
     doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
-      val startTimeMs = System.currentTimeMillis
+      val startTimeNs = System.nanoTime()
       // Since we're storing bytes, initiate the replication before storing them locally.
       // This is faster as data is already serialized and ready to send.
       val replicationFuture = if (level.replication > 1) {
@@ -1038,7 +1038,7 @@ private[spark] class BlockManager(
         }
         addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
       }
-      logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+      logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}")
       if (level.replication > 1) {
         // Wait for asynchronous replication to finish
         try {
@@ -1086,7 +1086,7 @@ private[spark] class BlockManager(
       }
     }
 
-    val startTimeMs = System.currentTimeMillis
+    val startTimeNs = System.nanoTime()
     var exceptionWasThrown: Boolean = true
     val result: Option[T] = try {
       val res = putBody(putBlockInfo)
@@ -1125,12 +1125,11 @@ private[spark] class BlockManager(
         addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
       }
     }
+    val usedTimeMs = Utils.getUsedTimeNs(startTimeNs)
     if (level.replication > 1) {
-      logDebug("Putting block %s with replication took %s"
-        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+      logDebug(s"Putting block ${blockId} with replication took $usedTimeMs")
     } else {
-      logDebug("Putting block %s without replication took %s"
-        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+      logDebug(s"Putting block ${blockId} without replication took ${usedTimeMs}")
     }
     result
   }
@@ -1155,7 +1154,7 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
     doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
-      val startTimeMs = System.currentTimeMillis
+      val startTimeNs = System.nanoTime()
       var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
       // Size of the block in bytes
       var size = 0L
@@ -1215,9 +1214,9 @@ private[spark] class BlockManager(
           reportBlockStatus(blockId, putBlockStatus)
         }
         addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
-        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+        logDebug(s"Put block $blockId locally took ${Utils.getUsedTimeNs(startTimeNs)}")
         if (level.replication > 1) {
-          val remoteStartTime = System.currentTimeMillis
+          val remoteStartTimeNs = System.nanoTime()
           val bytesToReplicate = doGetLocalBytes(blockId, info)
           // [SPARK-16550] Erase the typed classTag when using default serialization, since
           // NettyBlockRpcServer crashes when deserializing repl-defined classes.
@@ -1232,8 +1231,7 @@ private[spark] class BlockManager(
           } finally {
             bytesToReplicate.dispose()
           }
-          logDebug("Put block %s remotely took %s"
-            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+          logDebug(s"Put block $blockId remotely took ${Utils.getUsedTimeNs(remoteStartTimeNs)}")
         }
       }
       assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
@@ -1331,10 +1329,11 @@ private[spark] class BlockManager(
   private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
     peerFetchLock.synchronized {
       val cachedPeersTtl = conf.get(config.STORAGE_CACHED_PEERS_TTL) // milliseconds
-      val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
+      val diff = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastPeerFetchTimeNs)
+      val timeout = diff > cachedPeersTtl
       if (cachedPeers == null || forceFetch || timeout) {
         cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
-        lastPeerFetchTime = System.currentTimeMillis
+        lastPeerFetchTimeNs = System.nanoTime()
         logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
       }
       cachedPeers
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 36cbaeb..aefa2ae 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.nio.ByteBuffer
 import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.mutable.ListBuffer
 
@@ -61,7 +61,7 @@ private[spark] class DiskStore(
       throw new IllegalStateException(s"Block $blockId is already present in the disk store")
     }
     logDebug(s"Attempting to put block $blockId")
-    val startTime = System.currentTimeMillis
+    val startTimeNs = System.nanoTime()
     val file = diskManager.getFile(blockId)
     val out = new CountingWritableChannel(openForWrite(file))
     var threwException: Boolean = true
@@ -84,11 +84,8 @@ private[spark] class DiskStore(
         }
       }
     }
-    val finishTime = System.currentTimeMillis
-    logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName,
-      Utils.bytesToString(file.length()),
-      finishTime - startTime))
+    logDebug(s"Block ${file.getName} stored as ${Utils.bytesToString(file.length())} file" +
+      s" on disk in ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 3966980..c75b209 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{InputStream, IOException}
 import java.nio.ByteBuffer
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -92,7 +92,7 @@ final class ShuffleBlockFetcherIterator(
    */
   private[this] var numBlocksProcessed = 0
 
-  private[this] val startTime = System.currentTimeMillis
+  private[this] val startTimeNs = System.nanoTime()
 
   /** Local blocks to fetch, excluding zero-sized blocks. */
   private[this] val localBlocks = scala.collection.mutable.LinkedHashSet[BlockId]()
@@ -250,7 +250,7 @@ final class ShuffleBlockFetcherIterator(
             logDebug("remainingBlocks: " + remainingBlocks)
           }
         }
-        logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
+        logTrace(s"Got remote block $blockId after ${Utils.getUsedTimeNs(startTimeNs)}")
       }
 
       override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
@@ -380,11 +380,11 @@ final class ShuffleBlockFetcherIterator(
     fetchUpToMaxBytes()
 
     val numFetches = remoteRequests.size - fetchRequests.size
-    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
+    logInfo(s"Started $numFetches remote fetches in ${Utils.getUsedTimeNs(startTimeNs)}")
 
     // Get Local Blocks
     fetchLocalBlocks()
-    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
+    logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
   }
 
   override def hasNext: Boolean = !isZombie && (numBlocksProcessed < numBlocksToFetch)
@@ -411,10 +411,10 @@ final class ShuffleBlockFetcherIterator(
     // is also corrupt, so the previous stage could be retried.
     // For local shuffle block, throw FailureFetchResult for the first IOException.
     while (!isZombie && result == null) {
-      val startFetchWait = System.currentTimeMillis()
+      val startFetchWait = System.nanoTime()
       result = results.take()
-      val stopFetchWait = System.currentTimeMillis()
-      shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
+      val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
+      shuffleMetrics.incFetchWaitTime(fetchWaitTime)
 
       result match {
         case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 37b21e2..3065bdf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1005,9 +1005,10 @@ private[spark] object Utils extends Logging {
 
   /**
    * Return the string to tell how long has passed in milliseconds.
+   * @param startTimeNs - a timestamp in nanoseconds returned by `System.nanoTime`.
    */
-  def getUsedTimeMs(startTimeMs: Long): String = {
-    " " + (System.currentTimeMillis - startTimeMs) + " ms"
+  def getUsedTimeNs(startTimeNs: Long): String = {
+    s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms"
   }
 
   /**
@@ -1738,23 +1739,23 @@ private[spark] object Utils extends Logging {
    *
    * @param numIters number of iterations
    * @param f function to be executed. If prepare is not None, the running time of each call to f
-   *          must be an order of magnitude longer than one millisecond for accurate timing.
+   *          must be an order of magnitude longer than one nanosecond for accurate timing.
    * @param prepare function to be executed before each call to f. Its running time doesn't count.
-   * @return the total time across all iterations (not counting preparation time)
+   * @return the total time across all iterations (not counting preparation time) in nanoseconds.
    */
   def timeIt(numIters: Int)(f: => Unit, prepare: Option[() => Unit] = None): Long = {
     if (prepare.isEmpty) {
-      val start = System.currentTimeMillis
+      val startNs = System.nanoTime()
       times(numIters)(f)
-      System.currentTimeMillis - start
+      System.nanoTime() - startNs
     } else {
       var i = 0
       var sum = 0L
       while (i < numIters) {
         prepare.get.apply()
-        val start = System.currentTimeMillis
+        val startNs = System.nanoTime()
         f
-        sum += System.currentTimeMillis - start
+        sum += System.nanoTime() - startNs
         i += 1
       }
       sum
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index b9b47d4..d4bf20c 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.lang.ref.WeakReference
+import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.HashSet
 import scala.language.existentials
@@ -98,10 +99,10 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
   /** Run GC and make sure it actually has run */
   protected def runGC() {
     val weakRef = new WeakReference(new Object())
-    val startTime = System.currentTimeMillis
+    val startTimeNs = System.nanoTime()
     System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
     // Wait until a weak reference object has been GCed
-    while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
+    while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(10) && weakRef.get != null) {
       System.gc()
       Thread.sleep(200)
     }
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 0c59259..00fecd4 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{Semaphore, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -273,8 +273,8 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     val jobA = Future {
       sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
       sc.parallelize(1 to 2, 2).map { i =>
-        val startTime = System.currentTimeMillis()
-        while (System.currentTimeMillis() < startTime + 10000) { }
+        val startTimeNs = System.nanoTime()
+        while (System.nanoTime() < startTimeNs + TimeUnit.SECONDS.toNanos(10)) { }
       }.count()
     }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index c187bcb..ea31a75 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -170,8 +170,8 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     }, 0, 1, TimeUnit.MILLISECONDS)
 
     try {
-      val t0 = System.currentTimeMillis()
-      while ((System.currentTimeMillis() - t0) < 1000) {
+      val t0 = System.nanoTime()
+      while ((System.nanoTime() - t0) < TimeUnit.SECONDS.toNanos(1)) {
         val conf = Try(new SparkConf(loadDefaults = true))
         assert(conf.isSuccess === true)
       }
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 36273d7..5cf9c08 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{Semaphore, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import org.apache.spark.internal.Logging
@@ -126,8 +126,9 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
             val ans = nums.map(number => {
               val running = ThreadingSuiteState.runningThreads
               running.getAndIncrement()
-              val time = System.currentTimeMillis()
-              while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
+              val timeNs = System.nanoTime()
+              while (running.get() != 4 &&
+                (System.nanoTime() - timeNs < TimeUnit.SECONDS.toNanos(1))) {
                 Thread.sleep(100)
               }
               if (running.get() != 4) {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0086a79..6c2159e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -632,7 +632,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     }
     val time = Utils.timeIt(2)({}, Some(prepare))
     require(cnt === 2, "prepare should be called twice")
-    require(time < 500, "preparation time should not count")
+    require(time < TimeUnit.MILLISECONDS.toNanos(500), "preparation time should not count")
   }
 
   test("fetch hcfs dir") {
@@ -907,7 +907,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
 
       // Start up a process that runs 'sleep 10'. Terminate the process and assert it takes
       // less time and the process is no longer there.
-      val startTimeMs = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       val process = new ProcessBuilder("sleep", "10").start()
       val pid = getPid(process)
       try {
@@ -915,8 +915,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
         val terminated = Utils.terminateProcess(process, 5000)
         assert(terminated.isDefined)
         process.waitFor(5, TimeUnit.SECONDS)
-        val durationMs = System.currentTimeMillis() - startTimeMs
-        assert(durationMs < 5000)
+        val durationNs = System.nanoTime() - startTimeNs
+        assert(durationNs < TimeUnit.SECONDS.toNanos(5))
         assert(!pidExists(pid))
       } finally {
         // Forcibly kill the test process just in case.
@@ -943,12 +943,13 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
         assert(pidExists(pid))
         try {
           signal(pid, "SIGSTOP")
-          val start = System.currentTimeMillis()
+          val startNs = System.nanoTime()
           val terminated = Utils.terminateProcess(process, 5000)
           assert(terminated.isDefined)
           process.waitFor(5, TimeUnit.SECONDS)
-          val duration = System.currentTimeMillis() - start
-          assert(duration < 6000) // add a little extra time to allow a force kill to finish
+          val duration = System.nanoTime() - startNs
+          // add a little extra time to allow a force kill to finish
+          assert(duration < TimeUnit.SECONDS.toNanos(6))
           assert(!pidExists(pid))
         } finally {
           signal(pid, "SIGKILL")
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
index 4f38241..4759a83 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util.collection
 
+import java.util.concurrent.TimeUnit
+
 import scala.reflect.ClassTag
 import scala.util.Random
 
@@ -192,9 +194,9 @@ private object SizeTrackerSuite {
   }
 
   def time(f: => Unit): Long = {
-    val start = System.currentTimeMillis()
+    val startNs = System.nanoTime()
     f
-    System.currentTimeMillis() - start
+    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
   }
 
   def averageTime(v: Seq[Long]): Long = {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
index 46a05e2..a14fb76 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -19,9 +19,11 @@ package org.apache.spark.util.collection
 
 import java.lang.{Float => JFloat}
 import java.util.{Arrays, Comparator}
+import java.util.concurrent.TimeUnit
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils.timeIt
 import org.apache.spark.util.random.XORShiftRandom
 
 class SorterSuite extends SparkFunSuite with Logging {
@@ -79,13 +81,13 @@ class SorterSuite extends SparkFunSuite with Logging {
       return
     }
 
-    val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+    val firstTry = TimeUnit.NANOSECONDS.toMillis(timeIt(1)(f, Some(prepare)))
     System.gc()
 
     var i = 0
     var next10: Long = 0
     while (i < 10) {
-      val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+      val time = TimeUnit.NANOSECONDS.toMillis(timeIt(1)(f, Some(prepare)))
       next10 += time
       logInfo(s"$name: Took $time ms")
       i += 1
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
index 08af330..b327e13 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -18,6 +18,8 @@
 // scalastyle:off println
 package org.apache.spark.examples
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.spark.sql.SparkSession
 
 
@@ -36,10 +38,10 @@ object HdfsTest {
     val file = spark.read.text(args(0)).rdd
     val mapped = file.map(s => s.length).cache()
     for (iter <- 1 to 10) {
-      val start = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       for (x <- mapped) { x + 2 }
-      val end = System.currentTimeMillis()
-      println(s"Iteration $iter took ${end-start} ms")
+      val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
+      println(s"Iteration $iter took $durationMs ms")
     }
     println(s"File contents: ${file.map(_.toString).take(1).mkString(",").slice(0, 10)}")
     println(s"Returned length(s) of: ${file.map(_.length).sum().toString}")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index 491a9c6..d9edf3c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable
@@ -221,13 +222,13 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaM
       .as[(String, String)]
     val query = startStream(kafka.map(kv => kv._2.toInt))
 
-    val testTime = 20.seconds
-    val startTime = System.currentTimeMillis()
+    val testTimeNs = TimeUnit.SECONDS.toNanos(20)
+    val startTimeNs = System.nanoTime()
     // Track the current existing topics
     val topics = mutable.ArrayBuffer[String]()
     // Track topics that have been deleted
     val deletedTopics = mutable.Set[String]()
-    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+    while (System.nanoTime() - startTimeNs < testTimeNs) {
       Random.nextInt(10) match {
         case 0 => // Create a new topic
           val topic = newTopic()
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index efcd5d6..fd9fd00 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
 import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
+import java.util.concurrent.{TimeoutException, TimeUnit}
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -265,14 +265,14 @@ private[kafka010] class KafkaTestUtils extends Logging {
       }
     }
 
-    val startTime = System.currentTimeMillis()
+    val startTimeNs = System.nanoTime()
     @tailrec
     def tryAgain(attempt: Int): T = {
       makeAttempt() match {
         case Right(result) => result
         case Left(e) =>
-          val duration = System.currentTimeMillis() - startTime
-          if (duration < timeout.milliseconds) {
+          val durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
+          if (durationMs < timeout.milliseconds) {
             Thread.sleep(interval.milliseconds)
           } else {
             throw new TimeoutException(e.getMessage)
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 88b2942..5072b3a 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.kinesis
 
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -251,13 +253,16 @@ class KinesisSequenceRangeIterator(
 
   /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
   private def retryOrTimeout[T](message: String)(body: => T): T = {
-    val startTimeMs = System.currentTimeMillis()
+    val startTimeNs = System.nanoTime()
     var retryCount = 0
     var result: Option[T] = None
     var lastError: Throwable = null
     var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs
 
-    def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= kinesisReadConfigs.retryTimeoutMs
+    def isTimedOut = {
+      val retryTimeoutNs = TimeUnit.MILLISECONDS.toNanos(kinesisReadConfigs.retryTimeoutMs)
+      (System.nanoTime() - startTimeNs) >= retryTimeoutNs
+    }
     def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries
 
     while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 73ac7a3..2ac83c8 100644
--- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -183,9 +183,8 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
   }
 
   private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
-    val startTime = System.currentTimeMillis()
-    val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
-    while (System.currentTimeMillis() < endTime) {
+    val startTimeNs = System.nanoTime()
+    while (System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(createStreamTimeoutSeconds)) {
       Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
       describeStream(streamNameToWaitFor).foreach { description =>
         val streamStatus = description.getStreamStatus()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index f665727..6aeea91 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.graphx
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.spark.SparkContext
 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 import org.apache.spark.internal.Logging
@@ -63,7 +65,7 @@ object GraphLoader extends Logging {
       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
     : Graph[Int, Int] =
   {
-    val startTime = System.currentTimeMillis
+    val startTimeNs = System.nanoTime()
 
     // Parse the edge data table directly into edge partitions
     val lines =
@@ -93,7 +95,8 @@ object GraphLoader extends Logging {
     }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
     edges.count()
 
-    logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+    logInfo(s"It took ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" +
+      " to load the edges")
 
     GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
       vertexStorageLevel = vertexStorageLevel)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
index e539dec..ca23e6b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/stopwatches.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ml.util
 
+import java.util.concurrent.TimeUnit
+
 import scala.collection.mutable
 
 import org.apache.spark.SparkContext
@@ -73,7 +75,7 @@ private[spark] abstract class Stopwatch extends Serializable {
   /**
    * Gets the current time in milliseconds.
    */
-  protected def now: Long = System.currentTimeMillis()
+  protected def now: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
 
   /**
    * Adds input duration to total elapsed time.
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
index 54e363a..de3726f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/StopwatchSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.ml.util
 
 import java.util.Random
+import java.util.concurrent.TimeUnit
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -121,5 +122,5 @@ private object StopwatchSuite extends SparkFunSuite {
   }
 
   /** The current time in milliseconds. */
-  private def now: Long = System.currentTimeMillis()
+  private def now: Long = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
 }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 8bd61c2..6681ca3 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
 import java.util.{Collections, List => JList}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.locks.ReentrantLock
 
@@ -107,9 +108,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private var totalGpusAcquired = 0
 
   // The amount of time to wait for locality scheduling
-  private val localityWait = conf.get(config.LOCALITY_WAIT)
+  private val localityWaitNs = TimeUnit.MILLISECONDS.toNanos(conf.get(config.LOCALITY_WAIT))
   // The start of the waiting, for data local scheduling
-  private var localityWaitStartTime = System.currentTimeMillis()
+  private var localityWaitStartTimeNs = System.nanoTime()
   // If true, the scheduler is in the process of launching executors to reach the requested
   // executor limit
   private var launchingExecutors = false
@@ -357,7 +358,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       } else {
         if (!launchingExecutors) {
           launchingExecutors = true
-          localityWaitStartTime = System.currentTimeMillis()
+          localityWaitStartTimeNs = System.nanoTime()
         }
       }
 
@@ -591,7 +592,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     // increase coverage.
     val remainingHosts = allDesiredHosts -- currentHosts
     if (!remainingHosts.contains(offerHostname) &&
-      (System.currentTimeMillis() - localityWaitStartTime <= localityWait)) {
+      (System.nanoTime() - localityWaitStartTimeNs <= localityWaitNs)) {
       logDebug("Skipping host and waiting for locality. host: " + offerHostname)
       return false
     }
@@ -750,7 +751,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     logInfo("Capping the total amount of executors to " + requestedTotal)
     executorLimitOption = Some(requestedTotal)
     // Update the locality wait start time to continue trying for locality.
-    localityWaitStartTime = System.currentTimeMillis()
+    localityWaitStartTimeNs = System.nanoTime()
     true
   }
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 7458e13..1482b520 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -587,7 +587,7 @@ private[spark] class ApplicationMaster(
           }
           try {
             val numPendingAllocate = allocator.getPendingAllocate.size
-            var sleepStart = 0L
+            var sleepStartNs = 0L
             var sleepInterval = 200L // ms
             allocatorLock.synchronized {
               sleepInterval =
@@ -600,11 +600,11 @@ private[spark] class ApplicationMaster(
                   nextAllocationInterval = initialAllocationInterval
                   heartbeatInterval
                 }
-              sleepStart = System.currentTimeMillis()
+              sleepStartNs = System.nanoTime()
               allocatorLock.wait(sleepInterval)
             }
-            val sleepDuration = System.currentTimeMillis() - sleepStart
-            if (sleepDuration < sleepInterval) {
+            val sleepDuration = System.nanoTime() - sleepStartNs
+            if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
               // log when sleep is interrupted
               logDebug(s"Number of pending allocations is $numPendingAllocate. " +
                   s"Slept for $sleepDuration/$sleepInterval ms.")
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 3a79131..49367e0 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -100,9 +100,9 @@ abstract class BaseYarnClusterSuite
     // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
     // done so in a timely manner (defined to be 10 seconds).
     val config = yarnCluster.getConfig()
-    val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+    val startTimeNs = System.nanoTime()
     while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
-      if (System.currentTimeMillis() > deadline) {
+      if (System.nanoTime() - startTimeNs > TimeUnit.SECONDS.toNanos(10)) {
         throw new IllegalStateException("Timed out waiting for RM to come up.")
       }
       logDebug("RM address still not set in configuration, waiting...")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index cb9ca4c..0bd8a92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.util.UUID
+import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
@@ -151,8 +152,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
   @throws[StreamingQueryException]
   def awaitAnyTermination(timeoutMs: Long): Boolean = {
 
-    val startTime = System.currentTimeMillis
-    def isTimedout = System.currentTimeMillis - startTime >= timeoutMs
+    val startTime = System.nanoTime()
+    def isTimedout = {
+      System.nanoTime() - startTime >= TimeUnit.MILLISECONDS.toNanos(timeoutMs)
+    }
 
     awaitTerminationLock.synchronized {
       while (!isTimedout && lastTerminatedQuery == null) {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 16cb78a..d9e4842 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.io._
 import java.util.{ArrayList => JArrayList, Locale}
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 
@@ -343,10 +344,10 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
     }
     if (tokens(0).toLowerCase(Locale.ROOT).equals("source") ||
       cmd_trimmed.startsWith("!") || isRemoteMode) {
-      val start = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       super.processCmd(cmd)
-      val end = System.currentTimeMillis()
-      val timeTaken: Double = (end - start) / 1000.0
+      val endTimeNs = System.nanoTime()
+      val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0
       console.printInfo(s"Time taken: $timeTaken seconds")
       0
     } else {
@@ -364,13 +365,13 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
           driver.init()
           val out = sessionState.out
           val err = sessionState.err
-          val start: Long = System.currentTimeMillis()
+          val startTimeNs: Long = System.nanoTime()
           if (sessionState.getIsVerbose) {
             out.println(cmd)
           }
           val rc = driver.run(cmd)
-          val end = System.currentTimeMillis()
-          val timeTaken: Double = (end - start) / 1000.0
+          val endTimeNs = System.nanoTime()
+          val timeTaken: Double = TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs) / 1000.0
 
           ret = rc.getResponseCode
           if (ret != 0) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index e042ada..54f91ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -218,7 +218,7 @@ class CheckpointWriter(
         latestCheckpointTime = checkpointTime
       }
       var attempts = 0
-      val startTime = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       val tempFile = new Path(checkpointDir, "temp")
       // We will do checkpoint when generating a batch and completing a batch. When the processing
       // time of a batch is greater than the batch interval, checkpointing for completing an old
@@ -272,9 +272,9 @@ class CheckpointWriter(
           }
 
           // All done, print success
-          val finishTime = System.currentTimeMillis()
           logInfo(s"Checkpoint for time $checkpointTime saved to file '$checkpointFile'" +
-            s", took ${bytes.length} bytes and ${finishTime - startTime} ms")
+            s", took ${bytes.length} bytes and " +
+            s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
           jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
           return
         } catch {
@@ -304,14 +304,13 @@ class CheckpointWriter(
     if (stopped) return
 
     executor.shutdown()
-    val startTime = System.currentTimeMillis()
+    val startTimeNs = System.nanoTime()
     val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
     if (!terminated) {
       executor.shutdownNow()
     }
-    val endTime = System.currentTimeMillis()
     logInfo(s"CheckpointWriter executor terminated? $terminated," +
-      s" waited for ${endTime - startTime} ms.")
+      s" waited for ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms.")
     stopped = true
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 6f0b46b..4f3fa05 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.streaming.scheduler
 
+import java.util.concurrent.TimeUnit
+
 import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.internal.Logging
@@ -111,14 +113,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
     if (processReceivedData) {
       logInfo("Stopping JobGenerator gracefully")
-      val timeWhenStopStarted = System.currentTimeMillis()
+      val timeWhenStopStarted = System.nanoTime()
       val stopTimeoutMs = conf.getTimeAsMs(
         "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms")
       val pollTime = 100
 
       // To prevent graceful stop to get stuck permanently
       def hasTimedOut: Boolean = {
-        val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > stopTimeoutMs
+        val diff = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - timeWhenStopStarted))
+        val timedOut = diff > stopTimeoutMs
         if (timedOut) {
           logWarning("Timed out while stopping the job generator (timeout = " + stopTimeoutMs + ")")
         }
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 9156047..e9b35ac 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
 import java.net.ConnectException;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class JavaReceiverAPISuite implements Serializable {
@@ -71,15 +72,15 @@ public class JavaReceiverAPISuite implements Serializable {
       });
 
       ssc.start();
-      long startTime = System.currentTimeMillis();
-      long timeout = 10000;
+      long startTimeNs = System.nanoTime();
+      long timeout = TimeUnit.SECONDS.toNanos(10);
 
       Thread.sleep(200);
       for (int i = 0; i < 6; i++) {
         server.send(i + "\n"); // \n to make sure these are separate lines
         Thread.sleep(100);
       }
-      while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
+      while (dataCounter.get() == 0 && System.nanoTime() - startTimeNs < timeout) {
         Thread.sleep(100);
       }
       ssc.stop();
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 7376741..e81cfb5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -335,9 +335,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
       // Let the data from the receiver be received
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      val startTime = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       while ((!MultiThreadTestReceiver.haveAllThreadsFinished || output.sum < numTotalRecords) &&
-        System.currentTimeMillis() - startTime < 5000) {
+        System.nanoTime() - startTimeNs < TimeUnit.SECONDS.toNanos(5)) {
         Thread.sleep(100)
         clock.advance(batchDuration.milliseconds)
       }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index fff2d6f..cf8dd10 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming
 import java.io.{File, IOException}
 import java.nio.charset.StandardCharsets
 import java.util.UUID
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -234,12 +235,12 @@ object MasterFailureTest extends Logging {
         System.clearProperty("spark.streaming.clock")
         System.clearProperty("spark.driver.port")
         ssc.start()
-        val startTime = System.currentTimeMillis()
+        val startTimeNs = System.nanoTime()
         while (!killed && !isLastOutputGenerated && !isTimedOut) {
           Thread.sleep(100)
-          timeRan = System.currentTimeMillis() - startTime
+          timeRan = System.nanoTime() - startTimeNs
           isLastOutputGenerated = (output.nonEmpty && output.last == lastExpectedOutput)
-          isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
+          isTimedOut = (timeRan + totalTimeRan > TimeUnit.MILLISECONDS.toNanos(maxTimeToRun))
         }
       } catch {
         case e: Exception => logError("Error running streaming context", e)
@@ -265,7 +266,7 @@ object MasterFailureTest extends Logging {
       logInfo("New output = " + output.toSeq)
       logInfo("Merged output = " + mergedOutput)
       logInfo("Time ran = " + timeRan)
-      logInfo("Total time ran = " + totalTimeRan)
+      logInfo("Total time ran = " + TimeUnit.NANOSECONDS.toMillis(totalTimeRan))
 
       if (!isLastOutputGenerated && !isTimedOut) {
         val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 9d1203b..130e981 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming
 
 import java.io.File
 import java.nio.ByteBuffer
-import java.util.concurrent.Semaphore
+import java.util.concurrent.{Semaphore, TimeUnit}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -147,10 +147,10 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
     val generatedData = new ArrayBuffer[Int]
 
     // Generate blocks
-    val startTime = System.currentTimeMillis()
+    val startTimeNs = System.nanoTime()
     blockGenerator.start()
     var count = 0
-    while(System.currentTimeMillis - startTime < waitTime) {
+    while(System.nanoTime() - startTimeNs < TimeUnit.MILLISECONDS.toNanos(waitTime)) {
       blockGenerator.addData(count)
       generatedData += count
       count += 1
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 6a0f523..f2ae778 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.{File, IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -423,13 +423,13 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
       logInfo("Manual clock after advancing = " + clock.getTimeMillis())
 
       // Wait until expected number of output items have been generated
-      val startTime = System.currentTimeMillis()
+      val startTimeNs = System.nanoTime()
       while (output.size < numExpectedOutput &&
-        System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) < maxWaitTimeMillis) {
         logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
         ssc.awaitTerminationOrTimeout(50)
       }
-      val timeTaken = System.currentTimeMillis() - startTime
+      val timeTaken = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
       logInfo("Output generated in " + timeTaken + " milliseconds")
       output.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
       assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org