You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/02 14:01:33 UTC

[2/3] kafka git commit: KAFKA-2247; Merge kafka.utils.Time and kafka.common.utils.Time

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/DelayedItem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/DelayedItem.scala b/core/src/main/scala/kafka/utils/DelayedItem.scala
index cbab2a0..9d92b97 100644
--- a/core/src/main/scala/kafka/utils/DelayedItem.scala
+++ b/core/src/main/scala/kafka/utils/DelayedItem.scala
@@ -18,11 +18,14 @@
 package kafka.utils
 
 import java.util.concurrent._
+
+import org.apache.kafka.common.utils.Time
+
 import scala.math._
 
 class DelayedItem(delayMs: Long) extends Delayed with Logging {
 
-  private val dueMs = SystemTime.milliseconds + delayMs
+  private val dueMs = Time.SYSTEM.milliseconds + delayMs
 
   def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
 
@@ -30,15 +33,12 @@ class DelayedItem(delayMs: Long) extends Delayed with Logging {
    * The remaining delay time
    */
   def getDelay(unit: TimeUnit): Long = {
-    unit.convert(max(dueMs - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
+    unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
   }
 
   def compareTo(d: Delayed): Int = {
     val other = d.asInstanceOf[DelayedItem]
-
-    if(dueMs < other.dueMs) -1
-    else if(dueMs > other.dueMs) 1
-    else 0
+    java.lang.Long.compare(dueMs, other.dueMs)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 220b6e1..e3d389b 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -22,10 +22,10 @@ import java.io.IOException
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.Time
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.utils.{Time => JTime}
 
 object NetworkClientBlockingOps {
   implicit def networkClientBlockingOps(client: NetworkClient): NetworkClientBlockingOps =
@@ -52,7 +52,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
     * This method can be used to check the status of a connection prior to calling `blockingReady` to be able
     * to tell whether the latter completed a new connection.
     */
-  def isReady(node: Node)(implicit time: JTime): Boolean = {
+  def isReady(node: Node)(implicit time: Time): Boolean = {
     val currentTime = time.milliseconds()
     client.poll(0, currentTime)
     client.isReady(node, currentTime)
@@ -70,7 +70,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
    * care.
    */
-  def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
+  def blockingReady(node: Node, timeout: Long)(implicit time: Time): Boolean = {
     require(timeout >=0, "timeout should be >= 0")
 
     val startTime = time.milliseconds()
@@ -103,7 +103,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
    * care.
    */
-  def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: JTime): ClientResponse = {
+  def blockingSendAndReceive(request: ClientRequest, body: AbstractRequest)(implicit time: Time): ClientResponse = {
     client.send(request, time.milliseconds())
 
     pollContinuously { responses =>
@@ -126,7 +126,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
     * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
     * care.
     */
-  private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: JTime): T = {
+  private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: Time): T = {
 
     @tailrec
     def recursivePoll: T = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/Throttler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index 998ade1..e781cd6 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -18,8 +18,11 @@
 package kafka.utils
 
 import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.utils.Time
+
 import java.util.concurrent.TimeUnit
 import java.util.Random
+
 import scala.math._
 
 /**
@@ -33,19 +36,23 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(val desiredRatePerSec: Double, 
-                val checkIntervalMs: Long = 100L, 
-                val throttleDown: Boolean = true,
+class Throttler(desiredRatePerSec: Double,
+                checkIntervalMs: Long = 100L,
+                throttleDown: Boolean = true,
                 metricName: String = "throttler",
                 units: String = "entries",
-                val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+                time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
   
   private val lock = new Object
   private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
+  private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs)
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
   
   def maybeThrottle(observed: Double) {
+    val msPerSec = TimeUnit.SECONDS.toMillis(1)
+    val nsPerSec = TimeUnit.SECONDS.toNanos(1)
+
     meter.mark(observed.toLong)
     lock synchronized {
       observedSoFar += observed
@@ -53,15 +60,15 @@ class Throttler(val desiredRatePerSec: Double,
       val elapsedNs = now - periodStartNs
       // if we have completed an interval AND we have observed something, maybe
       // we should take a little nap
-      if(elapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
-        val rateInSecs = (observedSoFar * Time.NsPerSec) / elapsedNs
+      if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
+        val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs
         val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
-        if(needAdjustment) {
+        if (needAdjustment) {
           // solve for the amount of time to sleep to make us hit the desired rate
-          val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble
-          val elapsedMs = elapsedNs / Time.NsPerMs
+          val desiredRateMs = desiredRatePerSec / msPerSec.toDouble
+          val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs)
           val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
-          if(sleepTime > 0) {
+          if (sleepTime > 0) {
             trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
             time.sleep(sleepTime)
           }
@@ -71,14 +78,14 @@ class Throttler(val desiredRatePerSec: Double,
       }
     }
   }
-  
+
 }
 
 object Throttler {
   
   def main(args: Array[String]) {
     val rand = new Random()
-    val throttler = new Throttler(100000, 100, true, time = SystemTime)
+    val throttler = new Throttler(100000, 100, true, time = Time.SYSTEM)
     val interval = 30000
     var start = System.currentTimeMillis
     var total = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala
deleted file mode 100644
index d578a6a..0000000
--- a/core/src/main/scala/kafka/utils/Time.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.concurrent.TimeUnit
-
-/**
- * Some common constants
- */
-object Time {
-  val NsPerUs = 1000
-  val UsPerMs = 1000
-  val MsPerSec = 1000
-  val NsPerMs = NsPerUs * UsPerMs
-  val NsPerSec = NsPerMs * MsPerSec
-  val UsPerSec = UsPerMs * MsPerSec
-  val SecsPerMin = 60
-  val MinsPerHour = 60
-  val HoursPerDay = 24
-  val SecsPerHour = SecsPerMin * MinsPerHour
-  val SecsPerDay = SecsPerHour * HoursPerDay
-  val MinsPerDay = MinsPerHour * HoursPerDay
-}
-
-/**
- * A mockable interface for time functions
- */
-trait Time extends org.apache.kafka.common.utils.Time {
-  
-  def milliseconds: Long
-
-  def nanoseconds: Long
-
-  def hiResClockMs: Long = TimeUnit.NANOSECONDS.toMillis(nanoseconds)
-
-  def sleep(ms: Long)
-}
-
-/**
- * The normal system implementation of time functions
- */
-object SystemTime extends Time {
-  
-  def milliseconds: Long = System.currentTimeMillis
-  
-  def nanoseconds: Long = System.nanoTime
-  
-  def sleep(ms: Long): Unit = Thread.sleep(ms)
-  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index de56fe2..fcb5648 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -32,6 +32,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.{ACL, Stat}
@@ -364,7 +365,7 @@ class ZkUtils(val zkClient: ZkClient,
                          rack: Option[String],
                          apiVersion: ApiVersion) {
     val brokerIdPath = BrokerIdsPath + "/" + id
-    val timestamp = SystemTime.milliseconds.toString
+    val timestamp = Time.SYSTEM.milliseconds.toString
 
     val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2
     var jsonMap = Map("version" -> version,

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index 67de276..0538271 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.utils.threadsafe
-import org.apache.kafka.common.utils.Utils
-import kafka.utils.SystemTime
+import org.apache.kafka.common.utils.{Time, Utils}
 
 trait Timer {
   /**
@@ -56,7 +55,7 @@ trait Timer {
 class SystemTimer(executorName: String,
                   tickMs: Long = 1,
                   wheelSize: Int = 20,
-                  startMs: Long = SystemTime.hiResClockMs) extends Timer {
+                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
 
   // timeout timer
   private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
@@ -82,7 +81,7 @@ class SystemTimer(executorName: String,
   def add(timerTask: TimerTask): Unit = {
     readLock.lock()
     try {
-      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + SystemTime.hiResClockMs))
+      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
     } finally {
       readLock.unlock()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index 7a77b27..3dbfa8f 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -16,10 +16,11 @@
  */
 package kafka.utils.timer
 
-import java.util.concurrent.{TimeUnit, Delayed}
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.concurrent.{Delayed, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 
-import kafka.utils.{SystemTime, threadsafe}
+import kafka.utils.threadsafe
+import org.apache.kafka.common.utils.Time
 
 import scala.math._
 
@@ -117,7 +118,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
   }
 
   def getDelay(unit: TimeUnit): Long = {
-    unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS)
+    unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
   }
 
   def compareTo(d: Delayed): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 6bd8e4f..6fef2b3 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -28,7 +28,7 @@ import kafka.message._
 
 import scala.math._
 import joptsimple._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 
 /**
  * This test does linear writes using either a kafka log or a file and measures throughput and latency.
@@ -201,7 +201,7 @@ object TestLinearWriteSpeed {
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable {
     Utils.delete(dir)
-    val log = new Log(dir, config, 0L, scheduler, SystemTime)
+    val log = new Log(dir, config, 0L, scheduler, Time.SYSTEM)
     def write(): Int = {
       log.append(messages, true)
       messages.sizeInBytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 7cb5b6e..7636d96 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -23,8 +23,9 @@ import java.util.Random
 import java.util.concurrent._
 
 import joptsimple._
-import kafka.server.{DelayedOperationPurgatory, DelayedOperation}
+import kafka.server.{DelayedOperation, DelayedOperationPurgatory}
 import kafka.utils._
+import org.apache.kafka.common.utils.Time
 
 import scala.math._
 import scala.collection.JavaConverters._
@@ -276,7 +277,7 @@ object TestPurgatoryPerformance {
 
     private class Scheduled(val operation: FakeOperation) extends Delayed {
       def getDelay(unit: TimeUnit): Long = {
-        unit.convert(max(operation.completesAt - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS)
+        unit.convert(max(operation.completesAt - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
       }
 
       def compareTo(d: Delayed): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 16fe788..e93cae3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,17 +17,15 @@
 
 package kafka.api
 
-import kafka.cluster.{EndPoint, Broker}
+import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError}
 import kafka.common._
-import kafka.message.{Message, ByteBufferMessageSet}
-import kafka.utils.SystemTime
-
+import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.common.TopicAndPartition
-
 import java.nio.ByteBuffer
 
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
@@ -112,7 +110,7 @@ object SerializationTestUtils {
   def createTestOffsetCommitRequestV2: OffsetCommitRequest = {
     new OffsetCommitRequest(
       groupId = "group 1",
-      retentionMs = SystemTime.milliseconds,
+      retentionMs = Time.SYSTEM.milliseconds,
       requestInfo=collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"),
       TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata)
@@ -124,8 +122,8 @@ object SerializationTestUtils {
       versionId = 1,
       groupId = "group 1",
       requestInfo = collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds),
-      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds)
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", Time.SYSTEM.milliseconds),
+      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, Time.SYSTEM.milliseconds)
     ))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index 8d48609..f7dd40f 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -51,7 +51,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {
     TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
 
     /*There is no easy way to test that purging. Even if we mock kafka time with MockTime, the purging compares kafka time with the time stored in zookeeper stat and the
-    embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check
+    embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use Time.SYSTEM.sleep(changeExpirationMs + 1) issue a write and check
     Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted
     depending on how threads get scheduled.*/
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 6430b33..63afd4e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -20,14 +20,12 @@ package kafka.controller
 import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
 
-import kafka.api.RequestOrResponse
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.requests.{AbstractRequestResponse, AbstractRequest}
-import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.utils.Time
 import org.apache.log4j.{Level, Logger}
 import org.junit.{After, Before, Test}
 
@@ -150,7 +148,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
 }
 
 class MockChannelManager(private val controllerContext: ControllerContext, config: KafkaConfig, metrics: Metrics)
-  extends ControllerChannelManager(controllerContext, config, new SystemTime, metrics) {
+  extends ControllerChannelManager(controllerContext, config, Time.SYSTEM, metrics) {
 
   def stopSendThread(brokerId: Int) {
     val requestThread = brokerStateInfo(brokerId).requestSendThread

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index a050bb3..791bdb0 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -35,7 +35,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime(0)
+  val time = new MockTime(0, 0)
   val logConfig = LogConfig()
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
index 36c61d6..5e029fc 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -44,7 +44,7 @@ class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging
   val compactionLag = 1 * msPerHour
   assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
 
-  val time = new MockTime(1400000000000L)  // Tue May 13 16:53:20 UTC 2014
+  val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val cleanerBackOffMs = 200L
   val segmentSize = 100
   val deleteDelay = 1000

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 6e5806f..0cd52d6 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -40,7 +40,7 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
   val logConfig = LogConfig(logProps)
-  val time = new MockTime(1400000000000L)  // Tue May 13 16:53:20 UTC 2014
+  val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
 
   @After
   def tearDown(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 7f78148..f02c5cb 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -23,10 +23,9 @@ import kafka.common.LongRef
 import org.junit.{After, Test}
 import kafka.utils.TestUtils
 import kafka.message._
-import kafka.utils.SystemTime
+import org.apache.kafka.common.utils.Time
 
 import scala.collection._
- import scala.collection.mutable.ListBuffer
 
 class LogSegmentTest {
   
@@ -42,7 +41,7 @@ class LogSegmentTest {
     timeIdxFile.delete()
     val idx = new OffsetIndex(idxFile, offset, 1000)
     val timeIdx = new TimeIndex(timeIdxFile, offset, 1500)
-    val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, SystemTime)
+    val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, Time.SYSTEM)
     segments += seg
     seg
   }
@@ -296,7 +295,7 @@ class LogSegmentTest {
   /* create a segment with   pre allocate */
   def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
     val tempDir = TestUtils.tempDir()
-    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
+    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, Time.SYSTEM, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
     segments += seg
     seg
   }
@@ -317,7 +316,7 @@ class LogSegmentTest {
   @Test
   def testCreateWithInitFileSizeClearShutdown() {
     val tempDir = TestUtils.tempDir()
-    val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
+    val seg = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, false, 512*1024*1024, true)
 
     val ms = messages(50, "hello", "there")
     seg.append(50, Message.NoTimestamp, -1L, ms)
@@ -333,7 +332,7 @@ class LogSegmentTest {
     //After close, file should be trimmed
     assertEquals(oldSize, seg.log.file.length)
 
-    val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true,  512*1024*1024, true)
+    val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, Time.SYSTEM, true,  512*1024*1024, true)
     segments += segReopen
 
     val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 317f3d6..7d0764b 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
-import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.utils.Time
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.record.MemoryRecords
@@ -52,7 +52,7 @@ class SocketServerTest extends JUnitSuite {
   props.put("connections.max.idle.ms", "60000")
   val config = KafkaConfig.fromProps(props)
   val metrics = new Metrics
-  val server = new SocketServer(config, metrics, new SystemTime)
+  val server = new SocketServer(config, metrics, Time.SYSTEM)
   server.startup()
   val sockets = new ArrayBuffer[Socket]
 
@@ -239,7 +239,7 @@ class SocketServerTest extends JUnitSuite {
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
     val serverMetrics = new Metrics()
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime())
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM)
     try {
       overrideServer.startup()
       // make the maximum allowable number of connections
@@ -269,7 +269,7 @@ class SocketServerTest extends JUnitSuite {
     overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
 
     val serverMetrics = new Metrics
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, Time.SYSTEM)
     try {
       overrideServer.startup()
       val sslContext = SSLContext.getInstance("TLSv1.2")
@@ -317,7 +317,7 @@ class SocketServerTest extends JUnitSuite {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime) {
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM) {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, protocol, config.values, metrics) {
@@ -367,7 +367,7 @@ class SocketServerTest extends JUnitSuite {
     props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100")
     val serverMetrics = new Metrics
     var conn: Socket = null
-    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime)
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM)
     try {
       overrideServer.startup()
       conn = connect(overrideServer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index e84c498..f5943d6 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -36,6 +36,7 @@ import kafka.utils.TestUtils._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
+import org.apache.kafka.common.utils.Time
 
 @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0")
 class AsyncProducerTest {
@@ -416,6 +417,7 @@ class AsyncProducerTest {
       override def nanoseconds: Long = 0L
       override def milliseconds: Long = 0L
       override def sleep(ms: Long): Unit = {}
+      override def hiResClockMs: Long = 0L
     }
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = new FixedValuePartitioner(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index f4a339e..ec51e20 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -32,6 +32,7 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.utils.Time
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -316,7 +317,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       // any requests should be accepted and queue up, but not handled
       server1.requestHandlerPool.shutdown()
 
-      val t1 = SystemTime.milliseconds
+      val t1 = Time.SYSTEM.milliseconds
       try {
         // this message should be assigned to partition 0 whose leader is on broker 0, but
         // broker 0 will not response within timeoutMs millis.
@@ -324,7 +325,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       } catch {
         case _: FailedToSendMessageException => /* success */
       }
-      val t2 = SystemTime.milliseconds
+      val t2 = Time.SYSTEM.milliseconds
       // make sure we don't wait fewer than timeoutMs
       assertTrue((t2-t1) >= timeoutMs)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 7e72eec..d63afe7 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -28,6 +28,7 @@ import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.utils.Time
 import org.junit.Test
 import org.junit.Assert._
 
@@ -55,7 +56,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
 
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    val firstStart = SystemTime.milliseconds
+    val firstStart = Time.SYSTEM.milliseconds
     try {
       val response = producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
@@ -63,9 +64,9 @@ class SyncProducerTest extends KafkaServerTestHarness {
     } catch {
       case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
     }
-    val firstEnd = SystemTime.milliseconds
+    val firstEnd = Time.SYSTEM.milliseconds
     assertTrue((firstEnd-firstStart) < 2000)
-    val secondStart = SystemTime.milliseconds
+    val secondStart = Time.SYSTEM.milliseconds
     try {
       val response = producer.send(produceRequest("test", 0,
         new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1))
@@ -73,7 +74,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     } catch {
       case e: Exception => fail("Unexpected failure sending message to broker. " + e.getMessage)
     }
-    val secondEnd = SystemTime.milliseconds
+    val secondEnd = Time.SYSTEM.milliseconds
     assertTrue((secondEnd-secondStart) < 2000)
     try {
       val response = producer.send(produceRequest("test", 0,
@@ -216,14 +217,14 @@ class SyncProducerTest extends KafkaServerTestHarness {
     // any requests should be accepted and queue up, but not handled
     server.requestHandlerPool.shutdown()
 
-    val t1 = SystemTime.milliseconds
+    val t1 = Time.SYSTEM.milliseconds
     try {
       producer.send(request)
       fail("Should have received timeout exception since request handling is stopped.")
     } catch {
       case _: SocketTimeoutException => /* success */
     }
-    val t2 = SystemTime.milliseconds
+    val t2 = Time.SYSTEM.milliseconds
     // make sure we don't wait fewer than timeoutMs for a response
     assertTrue((t2-t1) >= timeoutMs)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index ae0d12f..49ef9f6 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.utils.SystemTime
+import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -55,13 +55,13 @@ class DelayedOperationTest {
   @Test
   def testRequestExpiry() {
     val expiration = 20L
-    val start = SystemTime.hiResClockMs
+    val start = Time.SYSTEM.hiResClockMs
     val r1 = new MockDelayedOperation(expiration)
     val r2 = new MockDelayedOperation(200000L)
     assertFalse("r1 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r1, Array("test1")))
     assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
     r1.awaitExpiration()
-    val elapsed = SystemTime.hiResClockMs - start
+    val elapsed = Time.SYSTEM.hiResClockMs - start
     assertTrue("r1 completed due to expiration", r1.isCompleted())
     assertFalse("r2 hasn't completed", r2.isCompleted())
     assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index e7e1554..358b2a4 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -18,9 +18,8 @@ package kafka.server
 
 import kafka.log._
 import java.io.File
-import kafka.utils.SystemTime
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{MockTime => JMockTime, Utils}
+import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
@@ -55,9 +54,10 @@ class HighwatermarkPersistenceTest {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     val metrics = new Metrics
+    val time = new MockTime
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler,
-      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler,
+      logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -66,9 +66,9 @@ class HighwatermarkPersistenceTest {
       val partition0 = replicaManager.getOrCreatePartition(topic, 0)
       // create leader and follower replicas
       val log0 = logManagers.head.createLog(TopicAndPartition(topic, 0), LogConfig())
-      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
+      val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time, 0, Some(log0))
       partition0.addReplicaIfNotExists(leaderReplicaPartition0)
-      val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
+      val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time)
       partition0.addReplicaIfNotExists(followerReplicaPartition0)
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -98,9 +98,10 @@ class HighwatermarkPersistenceTest {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     val metrics = new Metrics
+    val time = new MockTime
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils,
-      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+    val replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils,
+      scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
     replicaManager.startup()
     try {
       replicaManager.checkpointHighWatermarks()
@@ -110,7 +111,7 @@ class HighwatermarkPersistenceTest {
       // create leader log
       val topic1Log0 = logManagers.head.createLog(TopicAndPartition(topic1, 0), LogConfig())
       // create a local replica for topic1
-      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
+      val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, time, 0, Some(topic1Log0))
       topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
       replicaManager.checkpointHighWatermarks()
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
@@ -126,7 +127,7 @@ class HighwatermarkPersistenceTest {
       // create leader log
       val topic2Log0 = logManagers.head.createLog(TopicAndPartition(topic2, 0), LogConfig())
       // create a local replica for topic2
-      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
+      val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, time, 0, Some(topic2Log0))
       topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
       replicaManager.checkpointHighWatermarks()
       var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 0051247..2d51be9 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -19,7 +19,8 @@ package kafka.server
 import java.util.Properties
 
 import org.apache.kafka.common.metrics.Metrics
-import org.junit.{Test, Before, After}
+import org.junit.{After, Before, Test}
+
 import collection.mutable.HashMap
 import collection.mutable.Map
 import kafka.cluster.{Partition, Replica}
@@ -28,8 +29,9 @@ import kafka.log.Log
 import org.junit.Assert._
 import kafka.utils._
 import java.util.concurrent.atomic.AtomicBoolean
+
 import kafka.message.MessageSet
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
+import org.apache.kafka.common.utils.Time
 
 class IsrExpirationTest {
 
@@ -44,15 +46,14 @@ class IsrExpirationTest {
   val topic = "foo"
 
   val time = new MockTime
-  val jTime = new JMockTime
   val metrics = new Metrics
 
   var replicaManager: ReplicaManager = null
 
   @Before
   def setUp() {
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false),
-      QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower)
+    replicaManager = new ReplicaManager(configs.head, metrics, time, null, null, null, new AtomicBoolean(false),
+      QuotaFactory.instantiate(configs.head, metrics, time).follower)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index e3f0ad2..9cf6318 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -30,7 +30,7 @@ import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 
 class LeaderElectionTest extends ZooKeeperTestHarness {
@@ -134,7 +134,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val controllerContext = new ControllerContext(zkUtils)
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
-    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)
+    val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics)
     controllerChannelManager.startup()
     try {
       val staleControllerEpoch = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0f3ee63..42e9be1 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -32,7 +32,7 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 64c67d6..e46c41f 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -24,10 +24,9 @@ import kafka.utils._
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
-
 import java.util.Properties
 import java.io.File
 
@@ -43,7 +42,6 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
   var server: KafkaServer = null
   var logSize: Int = 100
   var simpleConsumer: SimpleConsumer = null
-  var time: Time = new MockTime()
 
   @Before
   override def setUp() {
@@ -53,8 +51,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
-    time = new MockTime()
-    server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
+    server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM)
     simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client")
     val consumerMetadataRequest = GroupCoordinatorRequest(group)
     Stream.continually {
@@ -255,7 +252,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     // committed offset should expire
     val commitRequest2 = OffsetCommitRequest(
       groupId = group,
-      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", SystemTime.milliseconds - 2*24*60*60*1000L)),
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", Time.SYSTEM.milliseconds - 2*24*60*60*1000L)),
       versionId = 1
     )
     assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index e226833..378d382 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -27,17 +27,15 @@ import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
-import org.easymock.EasyMock._
+import EasyMock._
 import org.junit.Assert._
 import org.junit.{After, Test}
 
 class ReplicaManagerQuotasTest {
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, new Properties()))
   val time = new MockTime
-  val jTime = new JMockTime
   val metrics = new Metrics
   val message = new Message("some-data-in-a-message".getBytes())
   val topicAndPartition1 = TopicAndPartition("test-topic", 1)
@@ -175,7 +173,7 @@ class ReplicaManagerQuotasTest {
     expect(logManager.getLog(anyObject())).andReturn(Some(log)).anyTimes()
     replay(logManager)
 
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager,
+    replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
 
     //create the two replicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 243e06e..c6d66ba 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
@@ -44,8 +43,7 @@ import scala.collection.Map
 class ReplicaManagerTest {
 
   val topic = "test-topic"
-  val time = new MockTime()
-  val jTime = new JMockTime
+  val time = new MockTime
   val metrics = new Metrics
   var zkClient : ZkClient = _
   var zkUtils : ZkUtils = _
@@ -66,7 +64,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
@@ -84,7 +82,7 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
@@ -101,7 +99,7 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
     try {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
@@ -126,7 +124,7 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
 
     try {
@@ -147,8 +145,8 @@ class ReplicaManagerTest {
       EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
       EasyMock.replay(metadataCache)
 
-      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava
-      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava
+      val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
+      val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
 
       val partition = rm.getOrCreatePartition(topic, 0)
       partition.getOrCreateReplica(0)
@@ -197,7 +195,7 @@ class ReplicaManagerTest {
     props.put("broker.id", Int.box(0))
     val config = KafkaConfig.fromProps(props)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName))
     try {
       val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2))

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 340b05f..b1ebeee 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -19,17 +19,16 @@ package kafka.server
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
+import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.junit.{After, Before, Test}
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.common.TopicAndPartition
 import org.apache.kafka.common.TopicPartition
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -48,7 +47,6 @@ class SimpleFetchTest {
 
   // set the replica manager with the partition
   val time = new MockTime
-  val jTime = new JMockTime
   val metrics = new Metrics
   val leaderLEO = 20L
   val followerLEO = 15L
@@ -98,7 +96,7 @@ class SimpleFetchTest {
     EasyMock.replay(logManager)
 
     // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager,
+    replicaManager = new ReplicaManager(configs.head, metrics, time, zkUtils, scheduler, logManager,
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
 
     // add the partition with two replicas, both in ISR

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index e9dbbb1..98ad644 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -19,6 +19,8 @@ package kafka.utils
 import scala.collection.mutable.PriorityQueue
 import java.util.concurrent.TimeUnit
 
+import org.apache.kafka.common.utils.Time
+
 /**
  * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when
  * the time is advanced. This class is meant to be used in conjunction with MockTime.

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/MockTime.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala
index 21fb4d9..2d83d65 100644
--- a/core/src/test/scala/unit/kafka/utils/MockTime.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala
@@ -17,45 +17,24 @@
 
 package kafka.utils
 
-import java.util.concurrent._
-
-import org.apache.kafka.common.utils
+import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
 /**
  * A class used for unit testing things which depend on the Time interface.
- * 
- * This class never manually advances the clock, it only does so when you call
- *   sleep(ms)
- * 
- * It also comes with an associated scheduler instance for managing background tasks in
- * a deterministic way.
+ * There a couple of difference between this class and `org.apache.kafka.common.utils.MockTime`:
+ *
+ * 1. This has an associated scheduler instance for managing background tasks in a deterministic way.
+ * 2. This doesn't support the `auto-tick` functionality as it interacts badly with the current implementation of `MockScheduler`.
  */
-class MockTime(@volatile private var currentMs: Long) extends Time {
-  
-  val scheduler = new MockScheduler(this)
-  
-  def this() = this(System.currentTimeMillis)
-  
-  def milliseconds: Long = currentMs
+class MockTime(currentTimeMs: Long, currentHiResTimeNs: Long) extends JMockTime(0, currentTimeMs, currentHiResTimeNs) {
 
-  def nanoseconds: Long = 
-    TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+  def this() = this(System.currentTimeMillis(), System.nanoTime())
 
-  def sleep(ms: Long) {
-    this.currentMs += ms
+  val scheduler = new MockScheduler(this)
+
+  override def sleep(ms: Long) {
+    super.sleep(ms)
     scheduler.tick()
   }
-  
-  override def toString = "MockTime(%d)".format(milliseconds)
 
 }
-
-object MockTime {
-  implicit def toCommonTime(time: MockTime): utils.Time = new utils.Time {
-    override def nanoseconds(): Long = time.nanoseconds
-
-    override def milliseconds(): Long = time.milliseconds
-
-    override def sleep(ms: Long): Unit = time.sleep(ms)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b45600d..33ab58c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,10 +25,12 @@ import java.util.{Properties, Random}
 import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
+
 import kafka.security.auth.{Acl, Authorizer, Resource}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.TestSslUtils
+
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import kafka.server._
 import kafka.producer._
@@ -47,6 +49,7 @@ import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.{TestUtils => JTestUtils}
 
 import scala.collection.Map
@@ -114,7 +117,7 @@ object TestUtils extends Logging {
    *
    * @param config The configuration of the server
    */
-  def createServer(config: KafkaConfig, time: Time = SystemTime): KafkaServer = {
+  def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = {
     val server = new KafkaServer(config, time)
     server.startup()
     server

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index df6da21..dc6907f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -242,7 +241,8 @@ public class KafkaStreams {
      */
     public KafkaStreams(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier) {
         // create the metrics
-        final Time time = new SystemTime();
+        final Time time = Time.SYSTEM;
+
         processId = UUID.randomUUID();
 
         this.config = config;

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 798c097..2e16af2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -57,7 +56,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) {
         this.inner = inner;
         this.metricScope = metricScope;
-        this.time = time != null ? time : new SystemTime();
+        this.time = time != null ? time : Time.SYSTEM;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 09952a3..6533460 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.KeyValue;
@@ -43,7 +42,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
         this.inner = inner;
         this.metricScope = metricScope;
-        this.time = time != null ? time : new SystemTime();
+        this.time = time != null ? time : Time.SYSTEM;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 2d87c16..111a271 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.KafkaStreams;
@@ -145,7 +144,7 @@ public class RegexSourceIntegrationTest {
 
         final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
             new DefaultKafkaClientSupplier(),
-            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM);
 
         final TestCondition oneTopicAdded = new TestCondition() {
             @Override
@@ -200,7 +199,7 @@ public class RegexSourceIntegrationTest {
 
         final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig,
             new DefaultKafkaClientSupplier(),
-            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), new SystemTime());
+            originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM);
 
         streamThreads[0] = testStreamThread;
 
@@ -308,7 +307,7 @@ public class RegexSourceIntegrationTest {
 
         final TestStreamThread leaderTestStreamThread = new TestStreamThread(builderLeader, streamsConfig,
                 new DefaultKafkaClientSupplier(),
-                originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), new SystemTime());
+                originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), Time.SYSTEM);
 
         leaderStreamThreads[0] = leaderTestStreamThread;
 
@@ -328,7 +327,7 @@ public class RegexSourceIntegrationTest {
 
         final TestStreamThread followerTestStreamThread = new TestStreamThread(builderFollower, streamsConfig,
                 new DefaultKafkaClientSupplier(),
-                originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), new SystemTime());
+                originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), Time.SYSTEM);
 
         followerStreamThreads[0] = followerTestStreamThread;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 2a1f753..303e3f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 import kafka.admin.AdminClient;
+import kafka.server.KafkaConfig$;
 import kafka.tools.StreamsResetter;
 import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
@@ -63,9 +64,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
  */
 public class ResetIntegrationTest {
     private static final int NUM_BROKERS = 1;
+
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final MockTime mockTime = CLUSTER.time;
+    public static final EmbeddedKafkaCluster CLUSTER;
+    static {
+        final Properties props = new Properties();
+        // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable
+        // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
+        // very long sleep times
+        props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
+    }
 
     private static final String APP_ID = "cleanup-integration-test";
     private static final String INPUT_TOPIC = "inputTopic";
@@ -76,12 +85,14 @@ public class ResetIntegrationTest {
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
-    private static final int TIMEOUT_MULTIPLYER = 5;
+    private static final int TIMEOUT_MULTIPLIER = 5;
 
-    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
     private static int testNo = 0;
     private static AdminClient adminClient = null;
 
+    private final MockTime mockTime = CLUSTER.time;
+    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
+
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
         CLUSTER.createTopic(INPUT_TOPIC);
@@ -111,8 +122,8 @@ public class ResetIntegrationTest {
             Thread.sleep(50);
 
             try {
-                TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
-                        "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+                TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                        "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
             } catch (GroupCoordinatorNotAvailableException e) {
                 continue;
             } catch (IllegalArgumentException e) {
@@ -154,15 +165,15 @@ public class ResetIntegrationTest {
         ).get(0);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
         cleanGlobal(INTERMEDIATE_USER_TOPIC);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
 
@@ -183,8 +194,8 @@ public class ResetIntegrationTest {
         assertThat(resultRerun, equalTo(result));
         assertThat(resultRerun2, equalTo(result2));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(INTERMEDIATE_USER_TOPIC);
 
         CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC);
@@ -227,15 +238,15 @@ public class ResetIntegrationTest {
                 60000);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT,
-                "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
+                "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
         streams.cleanUp();
         cleanGlobal(null);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted(null);
 
@@ -250,8 +261,8 @@ public class ResetIntegrationTest {
 
         assertThat(resultRerun, equalTo(result));
 
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
         cleanGlobal(null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index fc28ad5..9bef23e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -36,29 +36,35 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
+    private final Properties brokerConfig;
 
     public EmbeddedKafkaCluster(final int numBrokers) {
+        this(numBrokers, new Properties());
+    }
+
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) {
         brokers = new KafkaEmbedded[numBrokers];
+        this.brokerConfig = brokerConfig;
     }
 
-    public MockTime time = new MockTime();
+    public final MockTime time = new MockTime();
 
     /**
      * Creates and starts a Kafka cluster.
      */
     public void start() throws IOException, InterruptedException {
-        final Properties brokerConfig = new Properties();
-
         log.debug("Initiating embedded Kafka cluster startup");
         log.debug("Starting a ZooKeeper instance");
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
+
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
-        brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
-        brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
-        brokerConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
 
         for (int i = 0; i < brokers.length; i++) {
             brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
@@ -70,6 +76,11 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
+    private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey))
+            brokerConfig.put(propertyKey, propertyValue);
+    }
+
     /**
      * Stop the Kafka cluster.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 117e6ff..fe20225 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.utils.Time;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -25,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;