You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/02 14:01:34 UTC

[3/3] kafka git commit: KAFKA-2247; Merge kafka.utils.Time and kafka.common.utils.Time

KAFKA-2247; Merge kafka.utils.Time and kafka.common.utils.Time

Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>, Shikhar Bhushan <sh...@confluent.io>, Jason Gustafson <ja...@confluent.io>, Eno Thereska <en...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #2095 from ijuma/kafka-2247-consolidate-time-interfaces


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/128d0ff9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/128d0ff9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/128d0ff9

Branch: refs/heads/trunk
Commit: 128d0ff91d84a3a1f5a5237133f9ec01caf18d66
Parents: ea370be
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Dec 2 14:00:58 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Dec 2 14:00:58 2016 +0000

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  3 +-
 .../kafka/clients/producer/KafkaProducer.java   |  5 +-
 .../apache/kafka/common/metrics/Metrics.java    |  3 +-
 .../common/security/kerberos/KerberosLogin.java |  5 +-
 .../apache/kafka/common/utils/SystemTime.java   | 10 ++-
 .../org/apache/kafka/common/utils/Time.java     | 23 ++++++-
 .../producer/internals/BufferPoolTest.java      | 11 ++--
 .../internals/RecordAccumulatorTest.java        |  5 +-
 .../org/apache/kafka/common/utils/MockTime.java | 39 +++++++++---
 .../org/apache/kafka/test/Microbenchmarks.java  |  4 +-
 .../kafka/connect/cli/ConnectDistributed.java   |  3 +-
 .../kafka/connect/cli/ConnectStandalone.java    |  3 +-
 .../storage/KafkaConfigBackingStore.java        |  4 +-
 .../storage/KafkaOffsetBackingStore.java        |  4 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  4 +-
 .../org/apache/kafka/connect/util/MockTime.java | 16 +++--
 .../main/scala/kafka/admin/AdminClient.scala    |  7 +--
 .../main/scala/kafka/cluster/Partition.scala    |  1 +
 core/src/main/scala/kafka/cluster/Replica.scala |  9 +--
 .../ZkNodeChangeNotificationListener.scala      |  8 ++-
 .../kafka/consumer/ConsumerFetcherManager.scala |  9 ++-
 .../consumer/ZookeeperConsumerConnector.scala   |  3 +-
 .../kafka/controller/KafkaController.scala      |  2 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |  3 +-
 core/src/main/scala/kafka/log/Log.scala         | 23 ++++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  5 +-
 .../scala/kafka/log/LogCleanerManager.scala     |  3 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  3 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  8 +--
 .../scala/kafka/network/RequestChannel.scala    |  9 +--
 .../producer/async/DefaultEventHandler.scala    | 16 ++---
 .../producer/async/ProducerSendThread.scala     | 14 +++--
 .../scala/kafka/server/DelayedOperation.scala   |  2 -
 .../kafka/server/DynamicConfigManager.scala     |  9 +--
 .../src/main/scala/kafka/server/KafkaApis.scala | 19 +++---
 .../kafka/server/KafkaRequestHandler.scala      | 15 +++--
 .../main/scala/kafka/server/KafkaServer.scala   | 35 +++++------
 .../scala/kafka/server/ReplicaManager.scala     |  9 ++-
 .../kafka/server/ZookeeperLeaderElector.scala   |  8 ++-
 .../kafka/tools/ReplicaVerificationTool.scala   | 16 +++--
 .../kafka/tools/SimpleConsumerPerformance.scala |  6 +-
 .../main/scala/kafka/utils/DelayedItem.scala    | 12 ++--
 .../kafka/utils/NetworkClientBlockingOps.scala  | 10 +--
 core/src/main/scala/kafka/utils/Throttler.scala | 31 ++++++----
 core/src/main/scala/kafka/utils/Time.scala      | 65 --------------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  3 +-
 .../main/scala/kafka/utils/timer/Timer.scala    |  7 +--
 .../scala/kafka/utils/timer/TimerTaskList.scala |  9 +--
 .../other/kafka/TestLinearWriteSpeed.scala      |  4 +-
 .../other/kafka/TestPurgatoryPerformance.scala  |  5 +-
 .../api/RequestResponseSerializationTest.scala  | 14 ++---
 .../ZkNodeChangeNotificationListenerTest.scala  |  2 +-
 .../controller/ControllerFailoverTest.scala     |  6 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |  2 +-
 .../log/LogCleanerLagIntegrationTest.scala      |  2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  2 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 11 ++--
 .../unit/kafka/network/SocketServerTest.scala   | 12 ++--
 .../unit/kafka/producer/AsyncProducerTest.scala |  2 +
 .../unit/kafka/producer/ProducerTest.scala      |  5 +-
 .../unit/kafka/producer/SyncProducerTest.scala  | 13 ++--
 .../kafka/server/DelayedOperationTest.scala     |  6 +-
 .../server/HighwatermarkPersistenceTest.scala   | 21 ++++---
 .../unit/kafka/server/ISRExpirationTest.scala   | 11 ++--
 .../unit/kafka/server/LeaderElectionTest.scala  |  4 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  9 +--
 .../kafka/server/ReplicaManagerQuotasTest.scala |  6 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 18 +++---
 .../unit/kafka/server/SimpleFetchTest.scala     |  6 +-
 .../scala/unit/kafka/utils/MockScheduler.scala  |  2 +
 .../test/scala/unit/kafka/utils/MockTime.scala  | 43 ++++---------
 .../test/scala/unit/kafka/utils/TestUtils.scala |  5 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  4 +-
 .../state/internals/MeteredKeyValueStore.java   |  3 +-
 .../state/internals/MeteredWindowStore.java     |  3 +-
 .../integration/RegexSourceIntegrationTest.java |  9 ++-
 .../integration/ResetIntegrationTest.java       | 47 ++++++++------
 .../integration/utils/EmbeddedKafkaCluster.java | 25 +++++---
 .../integration/utils/IntegrationTestUtils.java |  2 +-
 .../internals/StreamPartitionAssignorTest.java  | 30 ++++-----
 .../processor/internals/StreamThreadTest.java   |  4 +-
 .../StreamThreadStateStoreProviderTest.java     |  6 +-
 83 files changed, 435 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e273a04..93aa739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -604,7 +603,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
-            this.time = new SystemTime();
+            this.time = Time.SYSTEM;
 
             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 54a5474..9dd8459 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -45,7 +45,6 @@ import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -208,7 +207,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             log.trace("Starting the Kafka producer");
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
-            this.time = new SystemTime();
+            this.time = Time.SYSTEM;
 
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
@@ -317,7 +316,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
                     this.metrics,
-                    new SystemTime(),
+                    Time.SYSTEM,
                     this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index afca2e5..bd20e13 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -97,7 +96,7 @@ public class Metrics implements Closeable {
      * @param defaultConfig The default config to use for all metrics that don't override their config
      */
     public Metrics(MetricConfig defaultConfig) {
-        this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
+        this(defaultConfig, new ArrayList<MetricsReporter>(0), Time.SYSTEM);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index ff9f218..48f9f93 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.security.authenticator.AbstractLogin;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.utils.Shell;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +51,7 @@ public class KerberosLogin extends AbstractLogin {
 
     private static final Random RNG = new Random();
 
-    private final Time time = new SystemTime();
+    private final Time time = Time.SYSTEM;
     private Thread t;
     private boolean isKrbTicket;
     private boolean isUsingTicketCache;
@@ -383,7 +382,7 @@ public class KerberosLogin extends AbstractLogin {
     }
 
     private long currentElapsedTime() {
-        return time.nanoseconds() / 1000000;
+        return time.hiResClockMs();
     }
 
     private long currentWallTime() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
index 18725de..1e32c6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.util.concurrent.TimeUnit;
+
 /**
- * A time implementation that uses the system clock and sleep call
+ * A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
+ * of this class.
  */
 public class SystemTime implements Time {
 
@@ -27,6 +30,11 @@ public class SystemTime implements Time {
     }
 
     @Override
+    public long hiResClockMs() {
+        return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+    }
+
+    @Override
     public long nanoseconds() {
         return System.nanoTime();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
index b2fad7f..c782619 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Time.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
@@ -17,17 +17,34 @@
 package org.apache.kafka.common.utils;
 
 /**
- * An interface abstracting the clock to use in unit testing classes that make use of clock time
+ * An interface abstracting the clock to use in unit testing classes that make use of clock time.
+ *
+ * Implementations of this class should be thread-safe.
  */
 public interface Time {
 
+    Time SYSTEM = new SystemTime();
+
     /**
-     * The current time in milliseconds
+     * Returns the current time in milliseconds.
      */
     long milliseconds();
 
     /**
-     * The current time in nanoseconds
+     * Returns the value returned by `nanoseconds` converted into milliseconds.
+     */
+    long hiResClockMs();
+
+    /**
+     * Returns the current value of the running JVM's high-resolution time source, in nanoseconds.
+     *
+     * <p>This method can only be used to measure elapsed time and is
+     * not related to any other notion of system or wall-clock time.
+     * The value returned represents nanoseconds since some fixed but
+     * arbitrary <i>origin</i> time (perhaps in the future, so values
+     * may be negative).  The same origin is used by all invocations of
+     * this method in an instance of a Java virtual machine; other
+     * virtual machine instances are likely to use a different origin.
      */
     long nanoseconds();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 3756d8a..41ac4f0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -40,7 +40,6 @@ import static org.junit.Assert.assertEquals;
 
 public class BufferPoolTest {
     private final MockTime time = new MockTime();
-    private final SystemTime systemTime = new SystemTime();
     private final Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 2000;
     private final String metricGroup = "TestMetrics";
@@ -124,7 +123,7 @@ public class BufferPoolTest {
     private void delayedDeallocate(final BufferPool pool, final ByteBuffer buffer, final long delayMs) {
         Thread thread = new Thread() {
             public void run() {
-                systemTime.sleep(delayMs);
+                Time.SYSTEM.sleep(delayMs);
                 pool.deallocate(buffer);
             }
         };
@@ -154,7 +153,7 @@ public class BufferPoolTest {
      */
     @Test
     public void testBlockTimeout() throws Exception {
-        BufferPool pool = new BufferPool(10, 1, metrics, systemTime, metricGroup);
+        BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup);
         ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
         ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
         ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
@@ -164,14 +163,14 @@ public class BufferPoolTest {
         // The third buffer will be de-allocated after maxBlockTimeMs since the most recent de-allocation
         delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
 
-        long beginTimeMs = systemTime.milliseconds();
+        long beginTimeMs = Time.SYSTEM.milliseconds();
         try {
             pool.allocate(10, maxBlockTimeMs);
             fail("The buffer allocated more memory than its maximum value 10");
         } catch (TimeoutException e) {
             // this is good
         }
-        long endTimeMs = systemTime.milliseconds();
+        long endTimeMs = Time.SYSTEM.milliseconds();
         assertTrue("Allocation should finish not much later than maxBlockTimeMs", endTimeMs - beginTimeMs < maxBlockTimeMs + 1000);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 216f07e..28521e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.record.LogEntry;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.junit.After;
 import org.junit.Test;
 
@@ -60,7 +60,6 @@ public class RecordAccumulatorTest {
     private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
     private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
     private MockTime time = new MockTime();
-    private SystemTime systemTime = new SystemTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
@@ -294,7 +293,7 @@ public class RecordAccumulatorTest {
     private void delayedInterrupt(final Thread thread, final long delayMs) {
         Thread t = new Thread() {
             public void run() {
-                systemTime.sleep(delayMs);
+                Time.SYSTEM.sleep(delayMs);
                 thread.interrupt();
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index 533f869..8178f4c 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -13,39 +13,60 @@
 package org.apache.kafka.common.utils;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A clock that you can manually advance by calling sleep
  */
 public class MockTime implements Time {
 
-    private long nanos = 0;
-    private long autoTickMs = 0;
+    private final long autoTickMs;
+
+    // Values from `nanoTime` and `currentTimeMillis` are not comparable, so we store them separately to allow tests
+    // using this class to detect bugs where this is incorrectly assumed to be true
+    private final AtomicLong timeMs;
+    private final AtomicLong highResTimeNs;
 
     public MockTime() {
-        this.nanos = System.nanoTime();
+        this(0);
     }
 
     public MockTime(long autoTickMs) {
-        this.nanos = System.nanoTime();
+        this(autoTickMs, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs) {
+        this.timeMs = new AtomicLong(currentTimeMs);
+        this.highResTimeNs = new AtomicLong(currentHighResTimeNs);
         this.autoTickMs = autoTickMs;
     }
 
     @Override
     public long milliseconds() {
-        this.sleep(autoTickMs);
-        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+        maybeSleep(autoTickMs);
+        return timeMs.get();
     }
 
     @Override
     public long nanoseconds() {
-        this.sleep(autoTickMs);
-        return nanos;
+        maybeSleep(autoTickMs);
+        return highResTimeNs.get();
+    }
+
+    @Override
+    public long hiResClockMs() {
+        return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+    }
+
+    private void maybeSleep(long ms) {
+        if (ms != 0)
+            sleep(ms);
     }
 
     @Override
     public void sleep(long ms) {
-        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+        timeMs.addAndGet(ms);
+        highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 8cd19b2..f7a47b1 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.kafka.common.utils.CopyOnWriteMap;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 
 public class Microbenchmarks {
 
@@ -71,7 +71,7 @@ public class Microbenchmarks {
         System.out.println(loc);
         System.out.println("binary search: " + (System.nanoTime() - start) / iters);
 
-        final SystemTime time = new SystemTime();
+        final Time time = Time.SYSTEM;
         final AtomicBoolean done = new AtomicBoolean(false);
         final Object lock = new Object();
         Thread t1 = new Thread() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index c3a61b2..fc957a7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
@@ -62,7 +61,7 @@ public class ConnectDistributed {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
-        Time time = new SystemTime();
+        Time time = Time.SYSTEM;
         ConnectorFactory connectorFactory = new ConnectorFactory();
         DistributedConfig config = new DistributedConfig(workerProps);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 65a71af..c125a33 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
@@ -67,7 +66,7 @@ public class ConnectStandalone {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
-        Time time = new SystemTime();
+        Time time = Time.SYSTEM;
         ConnectorFactory connectorFactory = new ConnectorFactory();
         StandaloneConfig config = new StandaloneConfig(workerProps);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 1a46693..03d84ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -421,7 +421,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM);
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 9219986..4d66288 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -138,7 +138,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
     private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
                                                               Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM);
     }
 
     private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 30c7118..86f5797 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -125,7 +125,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private void createWorkerTask(TargetState initialState) {
         workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter,
-                valueConverter, producer, offsetReader, offsetWriter, config, new SystemTime());
+                valueConverter, producer, offsetReader, offsetWriter, config, Time.SYSTEM);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
index 85f6895..e13ddb0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
@@ -19,31 +19,37 @@ package org.apache.kafka.connect.util;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A clock that you can manually advance by calling sleep
  */
 public class MockTime implements Time {
 
-    private long nanos = 0;
+    private final AtomicLong nanos;
 
     public MockTime() {
-        this.nanos = System.nanoTime();
+        this.nanos = new AtomicLong(System.nanoTime());
     }
 
     @Override
     public long milliseconds() {
-        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+        return TimeUnit.MILLISECONDS.convert(this.nanos.get(), TimeUnit.NANOSECONDS);
+    }
+
+    @Override
+    public long hiResClockMs() {
+        return TimeUnit.NANOSECONDS.toMillis(nanos.get());
     }
 
     @Override
     public long nanoseconds() {
-        return nanos;
+        return nanos.get();
     }
 
     @Override
     public void sleep(long ms) {
-        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+        this.nanos.addAndGet(TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 9cd4823..a7e7ebc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -17,7 +17,7 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.common.KafkaException
-import kafka.coordinator.{GroupOverview, MemberSummary}
+import kafka.coordinator.GroupOverview
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
@@ -25,10 +25,9 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.Selector
-import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -200,7 +199,7 @@ object AdminClient {
   def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
 
   def create(config: AdminConfig): AdminClient = {
-    val time = new SystemTime
+    val time = Time.SYSTEM
     val metrics = new Metrics(time)
     val metadata = new Metadata
     val channelBuilder = ClientUtils.createChannelBuilder(config.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 44d6a77..c7d4044 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.protocol.Errors
 import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.requests.PartitionState
+import org.apache.kafka.common.utils.Time
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 13c1921..40cf181 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,15 +18,16 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.utils.{SystemTime, Time, Logging}
-import kafka.server.{LogReadResult, LogOffsetMetadata}
+import kafka.utils.Logging
+import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
-
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.kafka.common.utils.Time
+
 class Replica(val brokerId: Int,
               val partition: Partition,
-              time: Time = SystemTime,
+              time: Time = Time.SYSTEM,
               initialHighWatermarkValue: Long = 0L,
               val log: Option[Log] = None) extends Logging {
   // the high watermark offset value, in non-leader replicas only its message offsets are kept

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index ef8190c..960f690 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -18,10 +18,12 @@ package kafka.common
 
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import kafka.utils.{Logging, ZkUtils}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.exception.ZkInterruptedException
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
+import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
+import org.apache.kafka.common.utils.Time
+
 import scala.collection.JavaConverters._
 
 /**
@@ -53,7 +55,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
                                        private val seqNodePrefix: String,
                                        private val notificationHandler: NotificationHandler,
                                        private val changeExpirationMs: Long = 15 * 60 * 1000,
-                                       private val time: Time = SystemTime) extends Logging {
+                                       private val time: Time = Time.SYSTEM) extends Logging {
   private var lastExecutedChange = -1L
   private val isClosed = new AtomicBoolean(false)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 5b5fe0d..dcdeb1e 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -17,17 +17,20 @@
 
 package kafka.consumer
 
-import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
+import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset}
 import kafka.cluster.{BrokerEndPoint, Cluster}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Time
+
 import scala.collection.immutable
 import collection.mutable.HashMap
 import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
+
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ZkUtils
-import kafka.utils.{ShutdownableThread, SystemTime}
+import kafka.utils.ShutdownableThread
 import kafka.client.ClientUtils
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -39,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkUtils : ZkUtils)
-        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
                                        config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 22a0c9a..0b89477 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -39,6 +39,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
@@ -271,7 +272,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    val timestamp = SystemTime.milliseconds.toString
+    val timestamp = Time.SYSTEM.milliseconds.toString
     val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
                                                   "timestamp" -> timestamp))
     val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1f6e19a..d3137c3 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
-    onControllerResignation, config.brokerId)
+    onControllerResignation, config.brokerId, time)
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
   private val autoRebalanceScheduler = new KafkaScheduler(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index c33e376..506f5b9 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -21,12 +21,11 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
+import java.util.concurrent.TimeUnit
 
 import kafka.utils._
 import kafka.message._
 import kafka.common.KafkaException
-import java.util.concurrent.TimeUnit
-
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9e3dfac..24177d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -28,14 +28,14 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 
-import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
+import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ListOffsetRequest
 
 import scala.collection.Seq
 import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
@@ -87,7 +87,7 @@ class Log(@volatile var dir: File,
           @volatile var config: LogConfig,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
-          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+          time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -103,20 +103,23 @@ class Log(@volatile var dir: File,
     else
       0
   }
-  val t = time.milliseconds
+
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
-  loadSegments()
+  locally {
+    val startMs = time.milliseconds
+    loadSegments()
+    info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
+      .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+  }
 
   /* Calculate the offset of the next message */
-  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
+  @volatile private var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
+    activeSegment.size.toInt)
 
   val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
 
-  info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
-      .format(name, segments.size(), logEndOffset, time.milliseconds - t))
-
-  val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+  private val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
 
   newGauge("NumLogSegments",
     new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 17824ec..4a76b0c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -27,6 +27,7 @@ import kafka.common._
 import kafka.message._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.common.utils.Time
 
 import scala.Iterable
 import scala.collection._
@@ -69,7 +70,7 @@ import scala.collection._
 class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
                  val logs: Pool[TopicAndPartition, Log], 
-                 time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+                 time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
   
   /* for managing the state of partitions being cleaned. package-private to allow access in tests */
   private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
@@ -648,7 +649,7 @@ private[log] class Cleaner(val id: Int,
 /**
  * A simple struct for collecting stats about log cleaning
  */
-private class CleanerStats(time: Time = SystemTime) {
+private class CleanerStats(time: Time = Time.SYSTEM) {
   val startTime = time.milliseconds
   var mapCompleteTime = -1L
   var endTime = -1L

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b808348..92cbf0f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -26,7 +26,8 @@ import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.OffsetCheckpoint
 import kafka.utils.CoreUtils._
-import kafka.utils.{Logging, Pool, Time}
+import kafka.utils.{Logging, Pool}
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.{immutable, mutable}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 64b277a..ed79946 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
 import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
 import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+import org.apache.kafka.common.utils.Time
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -49,7 +50,7 @@ class LogManager(val logDirs: Array[File],
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
-                 private val time: Time) extends Logging {
+                 time: Time) extends Logging {
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index c63a7d6..c5418e3 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -19,12 +19,12 @@ package kafka.log
 import kafka.message._
 import kafka.common._
 import kafka.utils._
-import kafka.server.{LogOffsetMetadata, FetchDataInfo}
+import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.utils.Time
 
 import scala.math._
-import java.io.{IOException, File}
-
+import java.io.{File, IOException}
 
  /**
  * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
@@ -50,7 +50,7 @@ class LogSegment(val log: FileMessageSet,
                  val rollJitterMs: Long,
                  time: Time) extends Logging {
 
-  var created = time.milliseconds
+  private var created = time.milliseconds
 
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 0cece68..a19ad22 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaId
-import kafka.utils.{Logging, SystemTime}
+import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.network.Send
@@ -34,6 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Time
 import org.apache.log4j.Logger
 
 object RequestChannel extends Logging {
@@ -107,7 +108,7 @@ object RequestChannel extends Logging {
     trace("Processor %d received request : %s".format(processor, requestDesc(true)))
 
     def updateRequestMetrics() {
-      val endTimeMs = SystemTime.milliseconds
+      val endTimeMs = Time.SYSTEM.milliseconds
       // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
       // processing time is really small. This value is set in KafkaApis from a request handling thread.
       // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
@@ -158,7 +159,7 @@ object RequestChannel extends Logging {
   }
 
   case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
-    request.responseCompleteTimeMs = SystemTime.milliseconds
+    request.responseCompleteTimeMs = Time.SYSTEM.milliseconds
 
     def this(processor: Int, request: Request, responseSend: Send) =
       this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
@@ -241,7 +242,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   def receiveResponse(processor: Int): RequestChannel.Response = {
     val response = responseQueues(processor).poll()
     if (response != null)
-      response.request.responseDequeueTimeMs = SystemTime.milliseconds
+      response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
     response
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index f9591ad..380b1c8 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -18,18 +18,20 @@
 package kafka.producer.async
 
 import kafka.common._
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
 import kafka.producer._
 import kafka.serializer.Encoder
 import kafka.utils._
 import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.protocol.Errors
+
 import scala.util.Random
-import scala.collection.{Seq, Map}
+import scala.collection.{Map, Seq}
 import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
 import java.util.concurrent.atomic._
-import kafka.api.{TopicMetadata, ProducerRequest}
-import org.apache.kafka.common.utils.Utils
+
+import kafka.api.{ProducerRequest, TopicMetadata}
+import org.apache.kafka.common.utils.{Time, Utils}
 
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -38,7 +40,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                private val keyEncoder: Encoder[K],
                                private val producerPool: ProducerPool,
                                private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
-                               private val time: Time = SystemTime)
+                               private val time: Time = Time.SYSTEM)
   extends EventHandler[K,V] with Logging {
 
   val isSync = ("sync" == config.producerType)
@@ -69,11 +71,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
       topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
       if (topicMetadataRefreshInterval >= 0 &&
-          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+          Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
         CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
         sendPartitionPerTopicCache.clear()
         topicMetadataToRefresh.clear
-        lastTopicMetadataRefreshTime = SystemTime.milliseconds
+        lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
       }
       outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
       if (outstandingProduceRequests.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index d423757..79ed1b8 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -17,12 +17,14 @@
 
 package kafka.producer.async
 
-import kafka.utils.{SystemTime, Logging}
-import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
+import kafka.utils.Logging
+import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
+
 import collection.mutable.ArrayBuffer
 import kafka.producer.KeyedMessage
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.Time
 
 @deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
 class ProducerSendThread[K,V](val threadName: String,
@@ -59,15 +61,15 @@ class ProducerSendThread[K,V](val threadName: String,
   }
 
   private def processEvents() {
-    var lastSend = SystemTime.milliseconds
+    var lastSend = Time.SYSTEM.milliseconds
     var events = new ArrayBuffer[KeyedMessage[K,V]]
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
-    Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
+    Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
                       .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
       currentQueueItem =>
-        val elapsed = (SystemTime.milliseconds - lastSend)
+        val elapsed = Time.SYSTEM.milliseconds - lastSend
         // check if the queue time is reached. This happens when the poll method above returns after a timeout and
         // returns a null object
         val expired = currentQueueItem == null
@@ -87,7 +89,7 @@ class ProducerSendThread[K,V](val threadName: String,
             debug("Batch full. Sending..")
           // if either queue time has reached or batch size has reached, dispatch to event handler
           tryToHandle(events)
-          lastSend = SystemTime.milliseconds
+          lastSend = Time.SYSTEM.milliseconds
           events = new ArrayBuffer[KeyedMessage[K,V]]
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 5248edf..dbee092 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -27,8 +27,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.utils.Utils
-
 import scala.collection._
 
 import com.yammer.metrics.core.Gauge

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 2e9e714..e0e6a03 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -18,18 +18,13 @@
 package kafka.server
 
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.security.auth.Resource
 import kafka.utils.Json
 import kafka.utils.Logging
-import kafka.utils.SystemTime
-import kafka.utils.Time
 import kafka.utils.ZkUtils
-import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection._
 import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
-
+import org.apache.kafka.common.utils.Time
 
 /**
  * Represents all the entities that can be configured via ZK
@@ -87,7 +82,7 @@ object ConfigEntityName {
 class DynamicConfigManager(private val zkUtils: ZkUtils,
                            private val configHandlers: Map[String, ConfigHandler],
                            private val changeExpirationMs: Long = 15*60*1000,
-                           private val time: Time = SystemTime) extends Logging {
+                           private val time: Time = Time.SYSTEM) extends Logging {
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(json: String) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 296beb3..fa3db5c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,19 +30,19 @@ import kafka.common._
 import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
 import kafka.log._
-import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
+import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
-import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 
@@ -64,7 +64,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metrics: Metrics,
                 val authorizer: Option[Authorizer],
                 val quotas: QuotaManagers,
-                val clusterId: String) extends Logging {
+                val clusterId: String,
+                time: Time) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
@@ -117,7 +118,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           error("Error when handling request %s".format(request.body), e)
         }
     } finally
-      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
+      request.apiLocalCompleteTimeMs = time.milliseconds
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -308,7 +309,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
         //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
         //   - If v2 we use the default expiration timestamp
-        val currentTimestamp = SystemTime.milliseconds
+        val currentTimestamp = time.milliseconds
         val defaultExpireTimestamp = offsetRetention + currentTimestamp
         val partitionData = authorizedTopics.mapValues { partitionData =>
           val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
@@ -407,7 +408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+      request.apiRemoteCompleteTimeMs = time.milliseconds
 
       quotas.produce.recordAndMaybeThrottle(
         request.session.sanitizedUser,
@@ -515,7 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       // When this callback is triggered, the remote API call has completed
-      request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+      request.apiRemoteCompleteTimeMs = time.milliseconds
 
       if (fetchRequest.isFromFollower) {
         // We've already evaluated against the quota and are good to go. Just need to record it now.
@@ -727,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     for (i <- segsArray.indices)
       offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
     if (lastSegmentHasSize)
-      offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
+      offsetTimeArray(segsArray.length) = (log.logEndOffset, time.milliseconds)
 
     var startIndex = -1
     timestamp match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f70955d..97145b4 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -21,8 +21,9 @@ import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
+
 import com.yammer.metrics.core.Meter
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 
 /**
  * A thread that answers kafka requests.
@@ -32,7 +33,8 @@ class KafkaRequestHandler(id: Int,
                           val aggregateIdleMeter: Meter,
                           val totalHandlerThreads: Int,
                           val requestChannel: RequestChannel,
-                          apis: KafkaApis) extends Runnable with Logging {
+                          apis: KafkaApis,
+                          time: Time) extends Runnable with Logging {
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
 
   def run() {
@@ -44,9 +46,9 @@ class KafkaRequestHandler(id: Int,
           // Since meter is calculated as total_recorded_value / time_window and
           // time_window is independent of the number of threads, each recorded idle
           // time should be discounted by # threads.
-          val startSelectTime = SystemTime.nanoseconds
+          val startSelectTime = time.nanoseconds
           req = requestChannel.receiveRequest(300)
-          val idleTime = SystemTime.nanoseconds - startSelectTime
+          val idleTime = time.nanoseconds - startSelectTime
           aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
         }
 
@@ -55,7 +57,7 @@ class KafkaRequestHandler(id: Int,
             id, brokerId))
           return
         }
-        req.requestDequeueTimeMs = SystemTime.milliseconds
+        req.requestDequeueTimeMs = time.milliseconds
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
@@ -70,6 +72,7 @@ class KafkaRequestHandler(id: Int,
 class KafkaRequestHandlerPool(val brokerId: Int,
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
+                              time: Time,
                               numThreads: Int) extends Logging with KafkaMetricsGroup {
 
   /* a meter to track the average free capacity of the request handlers */
@@ -79,7 +82,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) {
-    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
+    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4026a7e..bbddfae 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.AppInfoParser
+import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
@@ -86,7 +86,7 @@ object KafkaServer {
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
+class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
   private val startupComplete = new AtomicBoolean(false)
   private val isShuttingDown = new AtomicBoolean(false)
   private val isStartingUp = new AtomicBoolean(false)
@@ -97,10 +97,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
   reporters.add(new JmxReporter(jmxPrefix))
 
-  // This exists because the Metrics package from clients has its own Time implementation.
-  // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them
-  // Eventually, we want to merge the Time objects in core and clients
-  private implicit val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
   var metrics: Metrics = null
 
   private val metricConfig: MetricConfig = new MetricConfig()
@@ -180,7 +176,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
       val canStartup = isStartingUp.compareAndSet(false, true)
       if (canStartup) {
-        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
+        metrics = new Metrics(metricConfig, reporters, time, true)
         quotaManagers = QuotaFactory.instantiate(config, metrics, time)
 
         brokerState.newState(Starting)
@@ -207,22 +203,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         metadataCache = new MetadataCache(config.brokerId)
 
-        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
+        socketServer = new SocketServer(config, metrics, time)
         socketServer.startup()
 
         /* start replica manager */
-        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
+        replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
           isShuttingDown, quotaManagers.follower)
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
 
         /* start group coordinator */
-        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
+        // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
         groupCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
@@ -234,9 +231,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId)
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+          clusterId, time)
 
-        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
+          config.numIoThreads)
 
         Mx4jLoader.maybeLoad()
 
@@ -363,7 +362,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           NetworkReceive.UNLIMITED,
           config.connectionsMaxIdleMs,
           metrics,
-          kafkaMetricsTime,
+          time,
           "kafka-server-controlled-shutdown",
           Map.empty.asJava,
           false,
@@ -378,7 +377,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           config.requestTimeoutMs,
-          kafkaMetricsTime)
+          time)
       }
 
       var shutdownSucceeded: Boolean = false
@@ -420,15 +419,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           if (prevController != null) {
             try {
 
-              if (!networkClient.blockingReady(node(prevController), socketTimeoutMs))
+              if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time))
                 throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
               val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
               val controlledShutdownRequest = new ControlledShutdownRequest(config.brokerId)
-              val request = new ClientRequest(node(prevController).idString, kafkaMetricsTime.milliseconds(), true,
+              val request = new ClientRequest(node(prevController).idString, time.milliseconds(), true,
                 requestHeader, controlledShutdownRequest, null)
-              val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)
+              val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)(time)
 
               val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
               if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d2ec200..af64ffe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
 import scala.collection.JavaConverters._
@@ -102,7 +102,6 @@ object ReplicaManager {
 class ReplicaManager(val config: KafkaConfig,
                      metrics: Metrics,
                      time: Time,
-                     jTime: JTime,
                      val zkUtils: ZkUtils,
                      scheduler: Scheduler,
                      val logManager: LogManager,
@@ -116,7 +115,7 @@ class ReplicaManager(val config: KafkaConfig,
     new Partition(t, p, time, this)
   })
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix, quotaManager)
+  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
@@ -326,9 +325,9 @@ class ReplicaManager(val config: KafkaConfig,
                      responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
 
     if (isValidRequiredAcks(requiredAcks)) {
-      val sTime = SystemTime.milliseconds
+      val sTime = time.milliseconds
       val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
-      debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+      debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 
       val produceStatus = localProduceResults.map { case (topicPartition, result) =>
         topicPartition ->

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 73e7210..d9e2b5b 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -18,12 +18,13 @@ package kafka.server
 
 import kafka.utils.ZkUtils._
 import kafka.utils.CoreUtils._
-import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral}
+import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
 import kafka.controller.KafkaController
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -35,7 +36,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
                              electionPath: String,
                              onBecomingLeader: () => Unit,
                              onResigningAsLeader: () => Unit,
-                             brokerId: Int)
+                             brokerId: Int,
+                             time: Time)
   extends LeaderElector with Logging {
   var leaderId = -1
   // create the election path in ZK, if one does not exist
@@ -59,7 +61,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
   }
 
   def elect: Boolean = {
-    val timestamp = SystemTime.milliseconds.toString
+    val timestamp = time.milliseconds.toString
     val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
    
    leaderId = getControllerID 

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 01d3aa8..479b43c 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -19,18 +19,22 @@ package kafka.tools
 
 import joptsimple.OptionParser
 import kafka.cluster.BrokerEndPoint
-import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, MessageAndOffset, MessageSet}
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicReference
+
 import kafka.client.ClientUtils
-import java.util.regex.{PatternSyntaxException, Pattern}
+import java.util.regex.{Pattern, PatternSyntaxException}
+
 import kafka.api._
 import java.text.SimpleDateFormat
 import java.util.Date
+
 import kafka.common.TopicAndPartition
 import kafka.utils._
-import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist}
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
 
 /**
  *  For verifying the consistency among replicas.
@@ -59,7 +63,7 @@ object ReplicaVerificationTool extends Logging {
   val dateFormat = new SimpleDateFormat(dateFormatString)
 
   def getCurrentTimeString() = {
-    ReplicaVerificationTool.dateFormat.format(new Date(SystemTime.milliseconds))
+    ReplicaVerificationTool.dateFormat.format(new Date(Time.SYSTEM.milliseconds))
   }
 
   def main(args: Array[String]): Unit = {
@@ -210,7 +214,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
   private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]
   private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
   private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
-  @volatile private var lastReportTime = SystemTime.milliseconds
+  @volatile private var lastReportTime = Time.SYSTEM.milliseconds
   private var maxLag: Long = -1L
   private var offsetWithMaxLag: Long = -1L
   private var maxLagTopicAndPartition: TopicAndPartition = null
@@ -331,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
       }
       fetchResponsePerReplica.clear()
     }
-    val currentTimeMs = SystemTime.milliseconds
+    val currentTimeMs = Time.SYSTEM.milliseconds
     if (currentTimeMs - lastReportTime > reportInterval) {
       println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
         + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag

http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 3abbc40..69b6ee8 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -19,11 +19,13 @@ package kafka.tools
 
 import java.net.URI
 import java.text.SimpleDateFormat
-import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
 import org.apache.log4j.Logger
 import kafka.common.TopicAndPartition
+import org.apache.kafka.common.utils.Time
 
 
 /**
@@ -96,7 +98,7 @@ object SimpleConsumerPerformance {
             (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
             totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
         }
-        lastReportTime = SystemTime.milliseconds
+        lastReportTime = Time.SYSTEM.milliseconds
         lastBytesRead = totalBytesRead
         lastMessagesRead = totalMessagesRead
         consumedInterval = 0