You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/08/22 22:58:35 UTC

kafka git commit: KAFKA-4051: Use nanosecond clock for timers in broker

Repository: kafka
Updated Branches:
  refs/heads/trunk d903babb7 -> dedacd06e


KAFKA-4051: Use nanosecond clock for timers in broker

Use System.nanoseconds instead of System.currentTimeMillis in broker timer tasks to cope with changes to wall-clock time.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Gwen Shapira

Closes #1768 from rajinisivaram/KAFKA-4051


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dedacd06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dedacd06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dedacd06

Branch: refs/heads/trunk
Commit: dedacd06e4d1e967261b9bca3e32ba0e44b52ba1
Parents: d903bab
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Aug 22 15:58:32 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Aug 22 15:58:32 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/Time.scala                | 4 ++++
 core/src/main/scala/kafka/utils/timer/Timer.scala         | 5 +++--
 core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 2 +-
 3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala
index 194cc1f..f562ef7 100644
--- a/core/src/main/scala/kafka/utils/Time.scala
+++ b/core/src/main/scala/kafka/utils/Time.scala
@@ -17,6 +17,8 @@
 
 package kafka.utils
 
+import java.util.concurrent.TimeUnit
+
 /**
  * Some common constants
  */
@@ -44,6 +46,8 @@ trait Time {
 
   def nanoseconds: Long
 
+  def hiResClockMs: Long = TimeUnit.NANOSECONDS.toMillis(nanoseconds)
+
   def sleep(ms: Long)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index 2d78665..67de276 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.utils.threadsafe
 import org.apache.kafka.common.utils.Utils
+import kafka.utils.SystemTime
 
 trait Timer {
   /**
@@ -55,7 +56,7 @@ trait Timer {
 class SystemTimer(executorName: String,
                   tickMs: Long = 1,
                   wheelSize: Int = 20,
-                  startMs: Long = System.currentTimeMillis) extends Timer {
+                  startMs: Long = SystemTime.hiResClockMs) extends Timer {
 
   // timeout timer
   private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -81,7 +82,7 @@ class SystemTimer(executorName: String,
   def add(timerTask: TimerTask): Unit = {
     readLock.lock()
     try {
-      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
+      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + SystemTime.hiResClockMs))
     } finally {
       readLock.unlock()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index e862f4f..7a77b27 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -117,7 +117,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
   }
 
   def getDelay(unit: TimeUnit): Long = {
-    unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
+    unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS)
   }
 
   def compareTo(d: Delayed): Int = {