You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/08/22 22:58:35 UTC
kafka git commit: KAFKA-4051: Use nanosecond clock for timers in
broker
Repository: kafka
Updated Branches:
refs/heads/trunk d903babb7 -> dedacd06e
KAFKA-4051: Use nanosecond clock for timers in broker
Use System.nanoseconds instead of System.currentTimeMillis in broker timer tasks to cope with changes to wall-clock time.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Gwen Shapira
Closes #1768 from rajinisivaram/KAFKA-4051
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dedacd06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dedacd06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dedacd06
Branch: refs/heads/trunk
Commit: dedacd06e4d1e967261b9bca3e32ba0e44b52ba1
Parents: d903bab
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Aug 22 15:58:32 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Aug 22 15:58:32 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/utils/Time.scala | 4 ++++
core/src/main/scala/kafka/utils/timer/Timer.scala | 5 +++--
core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 2 +-
3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala
index 194cc1f..f562ef7 100644
--- a/core/src/main/scala/kafka/utils/Time.scala
+++ b/core/src/main/scala/kafka/utils/Time.scala
@@ -17,6 +17,8 @@
package kafka.utils
+import java.util.concurrent.TimeUnit
+
/**
* Some common constants
*/
@@ -44,6 +46,8 @@ trait Time {
def nanoseconds: Long
+ def hiResClockMs: Long = TimeUnit.NANOSECONDS.toMillis(nanoseconds)
+
def sleep(ms: Long)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index 2d78665..67de276 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.threadsafe
import org.apache.kafka.common.utils.Utils
+import kafka.utils.SystemTime
trait Timer {
/**
@@ -55,7 +56,7 @@ trait Timer {
class SystemTimer(executorName: String,
tickMs: Long = 1,
wheelSize: Int = 20,
- startMs: Long = System.currentTimeMillis) extends Timer {
+ startMs: Long = SystemTime.hiResClockMs) extends Timer {
// timeout timer
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -81,7 +82,7 @@ class SystemTimer(executorName: String,
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
- addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
+ addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + SystemTime.hiResClockMs))
} finally {
readLock.unlock()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index e862f4f..7a77b27 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -117,7 +117,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
}
def getDelay(unit: TimeUnit): Long = {
- unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
+ unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}
def compareTo(d: Delayed): Int = {