You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/02 14:01:34 UTC
[3/3] kafka git commit: KAFKA-2247;
Merge kafka.utils.Time and kafka.common.utils.Time
KAFKA-2247; Merge kafka.utils.Time and kafka.common.utils.Time
Also:
* Make all implementations of `Time` thread-safe as they are accessed from multiple threads in some cases.
* Change default implementation of `MockTime` to use two separate variables for `nanoTime` and `currentTimeMillis` as they have different `origins`.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>, Shikhar Bhushan <sh...@confluent.io>, Jason Gustafson <ja...@confluent.io>, Eno Thereska <en...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #2095 from ijuma/kafka-2247-consolidate-time-interfaces
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/128d0ff9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/128d0ff9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/128d0ff9
Branch: refs/heads/trunk
Commit: 128d0ff91d84a3a1f5a5237133f9ec01caf18d66
Parents: ea370be
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Dec 2 14:00:58 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Dec 2 14:00:58 2016 +0000
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 3 +-
.../kafka/clients/producer/KafkaProducer.java | 5 +-
.../apache/kafka/common/metrics/Metrics.java | 3 +-
.../common/security/kerberos/KerberosLogin.java | 5 +-
.../apache/kafka/common/utils/SystemTime.java | 10 ++-
.../org/apache/kafka/common/utils/Time.java | 23 ++++++-
.../producer/internals/BufferPoolTest.java | 11 ++--
.../internals/RecordAccumulatorTest.java | 5 +-
.../org/apache/kafka/common/utils/MockTime.java | 39 +++++++++---
.../org/apache/kafka/test/Microbenchmarks.java | 4 +-
.../kafka/connect/cli/ConnectDistributed.java | 3 +-
.../kafka/connect/cli/ConnectStandalone.java | 3 +-
.../storage/KafkaConfigBackingStore.java | 4 +-
.../storage/KafkaOffsetBackingStore.java | 4 +-
.../connect/runtime/WorkerSourceTaskTest.java | 4 +-
.../org/apache/kafka/connect/util/MockTime.java | 16 +++--
.../main/scala/kafka/admin/AdminClient.scala | 7 +--
.../main/scala/kafka/cluster/Partition.scala | 1 +
core/src/main/scala/kafka/cluster/Replica.scala | 9 +--
.../ZkNodeChangeNotificationListener.scala | 8 ++-
.../kafka/consumer/ConsumerFetcherManager.scala | 9 ++-
.../consumer/ZookeeperConsumerConnector.scala | 3 +-
.../kafka/controller/KafkaController.scala | 2 +-
.../main/scala/kafka/log/FileMessageSet.scala | 3 +-
core/src/main/scala/kafka/log/Log.scala | 23 ++++---
core/src/main/scala/kafka/log/LogCleaner.scala | 5 +-
.../scala/kafka/log/LogCleanerManager.scala | 3 +-
core/src/main/scala/kafka/log/LogManager.scala | 3 +-
core/src/main/scala/kafka/log/LogSegment.scala | 8 +--
.../scala/kafka/network/RequestChannel.scala | 9 +--
.../producer/async/DefaultEventHandler.scala | 16 ++---
.../producer/async/ProducerSendThread.scala | 14 +++--
.../scala/kafka/server/DelayedOperation.scala | 2 -
.../kafka/server/DynamicConfigManager.scala | 9 +--
.../src/main/scala/kafka/server/KafkaApis.scala | 19 +++---
.../kafka/server/KafkaRequestHandler.scala | 15 +++--
.../main/scala/kafka/server/KafkaServer.scala | 35 +++++------
.../scala/kafka/server/ReplicaManager.scala | 9 ++-
.../kafka/server/ZookeeperLeaderElector.scala | 8 ++-
.../kafka/tools/ReplicaVerificationTool.scala | 16 +++--
.../kafka/tools/SimpleConsumerPerformance.scala | 6 +-
.../main/scala/kafka/utils/DelayedItem.scala | 12 ++--
.../kafka/utils/NetworkClientBlockingOps.scala | 10 +--
core/src/main/scala/kafka/utils/Throttler.scala | 31 ++++++----
core/src/main/scala/kafka/utils/Time.scala | 65 --------------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +-
.../main/scala/kafka/utils/timer/Timer.scala | 7 +--
.../scala/kafka/utils/timer/TimerTaskList.scala | 9 +--
.../other/kafka/TestLinearWriteSpeed.scala | 4 +-
.../other/kafka/TestPurgatoryPerformance.scala | 5 +-
.../api/RequestResponseSerializationTest.scala | 14 ++---
.../ZkNodeChangeNotificationListenerTest.scala | 2 +-
.../controller/ControllerFailoverTest.scala | 6 +-
.../unit/kafka/log/BrokerCompressionTest.scala | 2 +-
.../log/LogCleanerLagIntegrationTest.scala | 2 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 2 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 11 ++--
.../unit/kafka/network/SocketServerTest.scala | 12 ++--
.../unit/kafka/producer/AsyncProducerTest.scala | 2 +
.../unit/kafka/producer/ProducerTest.scala | 5 +-
.../unit/kafka/producer/SyncProducerTest.scala | 13 ++--
.../kafka/server/DelayedOperationTest.scala | 6 +-
.../server/HighwatermarkPersistenceTest.scala | 21 ++++---
.../unit/kafka/server/ISRExpirationTest.scala | 11 ++--
.../unit/kafka/server/LeaderElectionTest.scala | 4 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 2 +-
.../unit/kafka/server/OffsetCommitTest.scala | 9 +--
.../kafka/server/ReplicaManagerQuotasTest.scala | 6 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 18 +++---
.../unit/kafka/server/SimpleFetchTest.scala | 6 +-
.../scala/unit/kafka/utils/MockScheduler.scala | 2 +
.../test/scala/unit/kafka/utils/MockTime.scala | 43 ++++---------
.../test/scala/unit/kafka/utils/TestUtils.scala | 5 +-
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../state/internals/MeteredKeyValueStore.java | 3 +-
.../state/internals/MeteredWindowStore.java | 3 +-
.../integration/RegexSourceIntegrationTest.java | 9 ++-
.../integration/ResetIntegrationTest.java | 47 ++++++++------
.../integration/utils/EmbeddedKafkaCluster.java | 25 +++++---
.../integration/utils/IntegrationTestUtils.java | 2 +-
.../internals/StreamPartitionAssignorTest.java | 30 ++++-----
.../processor/internals/StreamThreadTest.java | 4 +-
.../StreamThreadStateStoreProviderTest.java | 6 +-
83 files changed, 435 insertions(+), 432 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e273a04..93aa739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -604,7 +603,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
- this.time = new SystemTime();
+ this.time = Time.SYSTEM;
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 54a5474..9dd8459 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -45,7 +45,6 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,7 +207,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.trace("Starting the Kafka producer");
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
- this.time = new SystemTime();
+ this.time = Time.SYSTEM;
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
@@ -317,7 +316,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
- new SystemTime(),
+ Time.SYSTEM,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index afca2e5..bd20e13 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -97,7 +96,7 @@ public class Metrics implements Closeable {
* @param defaultConfig The default config to use for all metrics that don't override their config
*/
public Metrics(MetricConfig defaultConfig) {
- this(defaultConfig, new ArrayList<MetricsReporter>(0), new SystemTime());
+ this(defaultConfig, new ArrayList<MetricsReporter>(0), Time.SYSTEM);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
index ff9f218..48f9f93 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +51,7 @@ public class KerberosLogin extends AbstractLogin {
private static final Random RNG = new Random();
- private final Time time = new SystemTime();
+ private final Time time = Time.SYSTEM;
private Thread t;
private boolean isKrbTicket;
private boolean isUsingTicketCache;
@@ -383,7 +382,7 @@ public class KerberosLogin extends AbstractLogin {
}
private long currentElapsedTime() {
- return time.nanoseconds() / 1000000;
+ return time.hiResClockMs();
}
private long currentWallTime() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
index 18725de..1e32c6c 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -16,8 +16,11 @@
*/
package org.apache.kafka.common.utils;
+import java.util.concurrent.TimeUnit;
+
/**
- * A time implementation that uses the system clock and sleep call
+ * A time implementation that uses the system clock and sleep call. Use `Time.SYSTEM` instead of creating an instance
+ * of this class.
*/
public class SystemTime implements Time {
@@ -27,6 +30,11 @@ public class SystemTime implements Time {
}
@Override
+ public long hiResClockMs() {
+ return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+ }
+
+ @Override
public long nanoseconds() {
return System.nanoTime();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/main/java/org/apache/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
index b2fad7f..c782619 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Time.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
@@ -17,17 +17,34 @@
package org.apache.kafka.common.utils;
/**
- * An interface abstracting the clock to use in unit testing classes that make use of clock time
+ * An interface abstracting the clock to use in unit testing classes that make use of clock time.
+ *
+ * Implementations of this class should be thread-safe.
*/
public interface Time {
+ Time SYSTEM = new SystemTime();
+
/**
- * The current time in milliseconds
+ * Returns the current time in milliseconds.
*/
long milliseconds();
/**
- * The current time in nanoseconds
+ * Returns the value returned by `nanoseconds` converted into milliseconds.
+ */
+ long hiResClockMs();
+
+ /**
+ * Returns the current value of the running JVM's high-resolution time source, in nanoseconds.
+ *
+ * <p>This method can only be used to measure elapsed time and is
+ * not related to any other notion of system or wall-clock time.
+ * The value returned represents nanoseconds since some fixed but
+ * arbitrary <i>origin</i> time (perhaps in the future, so values
+ * may be negative). The same origin is used by all invocations of
+ * this method in an instance of a Java virtual machine; other
+ * virtual machine instances are likely to use a different origin.
*/
long nanoseconds();
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 3756d8a..41ac4f0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Test;
@@ -40,7 +40,6 @@ import static org.junit.Assert.assertEquals;
public class BufferPoolTest {
private final MockTime time = new MockTime();
- private final SystemTime systemTime = new SystemTime();
private final Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 2000;
private final String metricGroup = "TestMetrics";
@@ -124,7 +123,7 @@ public class BufferPoolTest {
private void delayedDeallocate(final BufferPool pool, final ByteBuffer buffer, final long delayMs) {
Thread thread = new Thread() {
public void run() {
- systemTime.sleep(delayMs);
+ Time.SYSTEM.sleep(delayMs);
pool.deallocate(buffer);
}
};
@@ -154,7 +153,7 @@ public class BufferPoolTest {
*/
@Test
public void testBlockTimeout() throws Exception {
- BufferPool pool = new BufferPool(10, 1, metrics, systemTime, metricGroup);
+ BufferPool pool = new BufferPool(10, 1, metrics, Time.SYSTEM, metricGroup);
ByteBuffer buffer1 = pool.allocate(1, maxBlockTimeMs);
ByteBuffer buffer2 = pool.allocate(1, maxBlockTimeMs);
ByteBuffer buffer3 = pool.allocate(1, maxBlockTimeMs);
@@ -164,14 +163,14 @@ public class BufferPoolTest {
// The third buffer will be de-allocated after maxBlockTimeMs since the most recent de-allocation
delayedDeallocate(pool, buffer3, maxBlockTimeMs / 2 * 5);
- long beginTimeMs = systemTime.milliseconds();
+ long beginTimeMs = Time.SYSTEM.milliseconds();
try {
pool.allocate(10, maxBlockTimeMs);
fail("The buffer allocated more memory than its maximum value 10");
} catch (TimeoutException e) {
// this is good
}
- long endTimeMs = systemTime.milliseconds();
+ long endTimeMs = Time.SYSTEM.milliseconds();
assertTrue("Allocation should finish not much later than maxBlockTimeMs", endTimeMs - beginTimeMs < maxBlockTimeMs + 1000);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 216f07e..28521e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -41,7 +41,7 @@ import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Test;
@@ -60,7 +60,6 @@ public class RecordAccumulatorTest {
private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null);
private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null);
private MockTime time = new MockTime();
- private SystemTime systemTime = new SystemTime();
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
@@ -294,7 +293,7 @@ public class RecordAccumulatorTest {
private void delayedInterrupt(final Thread thread, final long delayMs) {
Thread t = new Thread() {
public void run() {
- systemTime.sleep(delayMs);
+ Time.SYSTEM.sleep(delayMs);
thread.interrupt();
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index 533f869..8178f4c 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -13,39 +13,60 @@
package org.apache.kafka.common.utils;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* A clock that you can manually advance by calling sleep
*/
public class MockTime implements Time {
- private long nanos = 0;
- private long autoTickMs = 0;
+ private final long autoTickMs;
+
+ // Values from `nanoTime` and `currentTimeMillis` are not comparable, so we store them separately to allow tests
+ // using this class to detect bugs where this is incorrectly assumed to be true
+ private final AtomicLong timeMs;
+ private final AtomicLong highResTimeNs;
public MockTime() {
- this.nanos = System.nanoTime();
+ this(0);
}
public MockTime(long autoTickMs) {
- this.nanos = System.nanoTime();
+ this(autoTickMs, System.currentTimeMillis(), System.nanoTime());
+ }
+
+ public MockTime(long autoTickMs, long currentTimeMs, long currentHighResTimeNs) {
+ this.timeMs = new AtomicLong(currentTimeMs);
+ this.highResTimeNs = new AtomicLong(currentHighResTimeNs);
this.autoTickMs = autoTickMs;
}
@Override
public long milliseconds() {
- this.sleep(autoTickMs);
- return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+ maybeSleep(autoTickMs);
+ return timeMs.get();
}
@Override
public long nanoseconds() {
- this.sleep(autoTickMs);
- return nanos;
+ maybeSleep(autoTickMs);
+ return highResTimeNs.get();
+ }
+
+ @Override
+ public long hiResClockMs() {
+ return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
+ }
+
+ private void maybeSleep(long ms) {
+ if (ms != 0)
+ sleep(ms);
}
@Override
public void sleep(long ms) {
- this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+ timeMs.addAndGet(ms);
+ highResTimeNs.addAndGet(TimeUnit.MILLISECONDS.toNanos(ms));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 8cd19b2..f7a47b1 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.utils.CopyOnWriteMap;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
public class Microbenchmarks {
@@ -71,7 +71,7 @@ public class Microbenchmarks {
System.out.println(loc);
System.out.println("binary search: " + (System.nanoTime() - start) / iters);
- final SystemTime time = new SystemTime();
+ final Time time = Time.SYSTEM;
final AtomicBoolean done = new AtomicBoolean(false);
final Object lock = new Object();
Thread t1 = new Thread() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index c3a61b2..fc957a7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,7 +18,6 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
@@ -62,7 +61,7 @@ public class ConnectDistributed {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
- Time time = new SystemTime();
+ Time time = Time.SYSTEM;
ConnectorFactory connectorFactory = new ConnectorFactory();
DistributedConfig config = new DistributedConfig(workerProps);
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 65a71af..c125a33 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,7 +18,6 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
@@ -67,7 +66,7 @@ public class ConnectStandalone {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
- Time time = new SystemTime();
+ Time time = Time.SYSTEM;
ConnectorFactory connectorFactory = new ConnectorFactory();
StandaloneConfig config = new StandaloneConfig(workerProps);
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 1a46693..03d84ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -421,7 +421,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 9219986..4d66288 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
@@ -138,7 +138,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM);
}
private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 30c7118..86f5797 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
@@ -125,7 +125,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private void createWorkerTask(TargetState initialState) {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter,
- valueConverter, producer, offsetReader, offsetWriter, config, new SystemTime());
+ valueConverter, producer, offsetReader, offsetWriter, config, Time.SYSTEM);
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
index 85f6895..e13ddb0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
@@ -19,31 +19,37 @@ package org.apache.kafka.connect.util;
import org.apache.kafka.common.utils.Time;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* A clock that you can manually advance by calling sleep
*/
public class MockTime implements Time {
- private long nanos = 0;
+ private final AtomicLong nanos;
public MockTime() {
- this.nanos = System.nanoTime();
+ this.nanos = new AtomicLong(System.nanoTime());
}
@Override
public long milliseconds() {
- return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+ return TimeUnit.MILLISECONDS.convert(this.nanos.get(), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long hiResClockMs() {
+ return TimeUnit.NANOSECONDS.toMillis(nanos.get());
}
@Override
public long nanoseconds() {
- return nanos;
+ return nanos.get();
}
@Override
public void sleep(long ms) {
- this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+ this.nanos.addAndGet(TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 9cd4823..a7e7ebc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -17,7 +17,7 @@ import java.util.{Collections, Properties}
import java.util.concurrent.atomic.AtomicInteger
import kafka.common.KafkaException
-import kafka.coordinator.{GroupOverview, MemberSummary}
+import kafka.coordinator.GroupOverview
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
@@ -25,10 +25,9 @@ import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.Selector
-import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._
@@ -200,7 +199,7 @@ object AdminClient {
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {
- val time = new SystemTime
+ val time = Time.SYSTEM
val metrics = new Metrics(time)
val metadata = new Metadata
val channelBuilder = ClientUtils.createChannelBuilder(config.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 44d6a77..c7d4044 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.requests.PartitionState
+import org.apache.kafka.common.utils.Time
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 13c1921..40cf181 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -18,15 +18,16 @@
package kafka.cluster
import kafka.log.Log
-import kafka.utils.{SystemTime, Time, Logging}
-import kafka.server.{LogReadResult, LogOffsetMetadata}
+import kafka.utils.Logging
+import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
-
import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.utils.Time
+
class Replica(val brokerId: Int,
val partition: Partition,
- time: Time = SystemTime,
+ time: Time = Time.SYSTEM,
initialHighWatermarkValue: Long = 0L,
val log: Option[Log] = None) extends Logging {
// the high watermark offset value, in non-leader replicas only its message offsets are kept
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index ef8190c..960f690 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -18,10 +18,12 @@ package kafka.common
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Time, SystemTime, ZkUtils, Logging}
+import kafka.utils.{Logging, ZkUtils}
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.exception.ZkInterruptedException
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
+import org.I0Itec.zkclient.{IZkChildListener, IZkStateListener}
+import org.apache.kafka.common.utils.Time
+
import scala.collection.JavaConverters._
/**
@@ -53,7 +55,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
private val seqNodePrefix: String,
private val notificationHandler: NotificationHandler,
private val changeExpirationMs: Long = 15 * 60 * 1000,
- private val time: Time = SystemTime) extends Logging {
+ private val time: Time = Time.SYSTEM) extends Logging {
private var lastExecutedChange = -1L
private val isClosed = new AtomicBoolean(false)
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 5b5fe0d..dcdeb1e 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -17,17 +17,20 @@
package kafka.consumer
-import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
+import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset}
import kafka.cluster.{BrokerEndPoint, Cluster}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Time
+
import scala.collection.immutable
import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
+
import kafka.utils.CoreUtils.inLock
import kafka.utils.ZkUtils
-import kafka.utils.{ShutdownableThread, SystemTime}
+import kafka.utils.ShutdownableThread
import kafka.client.ClientUtils
import java.util.concurrent.atomic.AtomicInteger
@@ -39,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger
class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig,
private val zkUtils : ZkUtils)
- extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+ extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 22a0c9a..0b89477 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -39,6 +39,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
@@ -271,7 +272,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
- val timestamp = SystemTime.milliseconds.toString
+ val timestamp = Time.SYSTEM.milliseconds.toString
val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
"timestamp" -> timestamp))
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1f6e19a..d3137c3 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- onControllerResignation, config.brokerId)
+ onControllerResignation, config.brokerId, time)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index c33e376..506f5b9 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -21,12 +21,11 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic._
+import java.util.concurrent.TimeUnit
import kafka.utils._
import kafka.message._
import kafka.common.KafkaException
-import java.util.concurrent.TimeUnit
-
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 9e3dfac..24177d1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -28,14 +28,14 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
-import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
+import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetRequest
import scala.collection.Seq
import scala.collection.JavaConverters._
import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
@@ -87,7 +87,7 @@ class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
- time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+ time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -103,20 +103,23 @@ class Log(@volatile var dir: File,
else
0
}
- val t = time.milliseconds
+
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
- loadSegments()
+ locally {
+ val startMs = time.milliseconds
+ loadSegments()
+ info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
+ .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+ }
/* Calculate the offset of the next message */
- @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
+ @volatile private var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
+ activeSegment.size.toInt)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
- info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
- .format(name, segments.size(), logEndOffset, time.milliseconds - t))
-
- val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
+ private val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
newGauge("NumLogSegments",
new Gauge[Int] {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 17824ec..4a76b0c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -27,6 +27,7 @@ import kafka.common._
import kafka.message._
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import org.apache.kafka.common.utils.Time
import scala.Iterable
import scala.collection._
@@ -69,7 +70,7 @@ import scala.collection._
class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
- time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
+ time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
@@ -648,7 +649,7 @@ private[log] class Cleaner(val id: Int,
/**
* A simple struct for collecting stats about log cleaning
*/
-private class CleanerStats(time: Time = SystemTime) {
+private class CleanerStats(time: Time = Time.SYSTEM) {
val startTime = time.milliseconds
var mapCompleteTime = -1L
var endTime = -1L
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b808348..92cbf0f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -26,7 +26,8 @@ import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.OffsetCheckpoint
import kafka.utils.CoreUtils._
-import kafka.utils.{Logging, Pool, Time}
+import kafka.utils.{Logging, Pool}
+import org.apache.kafka.common.utils.Time
import scala.collection.{immutable, mutable}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 64b277a..ed79946 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+import org.apache.kafka.common.utils.Time
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -49,7 +50,7 @@ class LogManager(val logDirs: Array[File],
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
- private val time: Time) extends Logging {
+ time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index c63a7d6..c5418e3 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -19,12 +19,12 @@ package kafka.log
import kafka.message._
import kafka.common._
import kafka.utils._
-import kafka.server.{LogOffsetMetadata, FetchDataInfo}
+import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.utils.Time
import scala.math._
-import java.io.{IOException, File}
-
+import java.io.{File, IOException}
/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
@@ -50,7 +50,7 @@ class LogSegment(val log: FileMessageSet,
val rollJitterMs: Long,
time: Time) extends Logging {
- var created = time.milliseconds
+ private var created = time.milliseconds
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 0cece68..a19ad22 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaId
-import kafka.utils.{Logging, SystemTime}
+import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.network.Send
@@ -34,6 +34,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Time
import org.apache.log4j.Logger
object RequestChannel extends Logging {
@@ -107,7 +108,7 @@ object RequestChannel extends Logging {
trace("Processor %d received request : %s".format(processor, requestDesc(true)))
def updateRequestMetrics() {
- val endTimeMs = SystemTime.milliseconds
+ val endTimeMs = Time.SYSTEM.milliseconds
// In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes if the remote
// processing time is really small. This value is set in KafkaApis from a request handling thread.
// This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
@@ -158,7 +159,7 @@ object RequestChannel extends Logging {
}
case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
- request.responseCompleteTimeMs = SystemTime.milliseconds
+ request.responseCompleteTimeMs = Time.SYSTEM.milliseconds
def this(processor: Int, request: Request, responseSend: Send) =
this(processor, request, responseSend, if (responseSend == null) NoOpAction else SendAction)
@@ -241,7 +242,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def receiveResponse(processor: Int): RequestChannel.Response = {
val response = responseQueues(processor).poll()
if (response != null)
- response.request.responseDequeueTimeMs = SystemTime.milliseconds
+ response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
response
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index f9591ad..380b1c8 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -18,18 +18,20 @@
package kafka.producer.async
import kafka.common._
-import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils._
import org.apache.kafka.common.errors.{LeaderNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.protocol.Errors
+
import scala.util.Random
-import scala.collection.{Seq, Map}
+import scala.collection.{Map, Seq}
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
import java.util.concurrent.atomic._
-import kafka.api.{TopicMetadata, ProducerRequest}
-import org.apache.kafka.common.utils.Utils
+
+import kafka.api.{ProducerRequest, TopicMetadata}
+import org.apache.kafka.common.utils.{Time, Utils}
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -38,7 +40,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata],
- private val time: Time = SystemTime)
+ private val time: Time = Time.SYSTEM)
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
@@ -69,11 +71,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
- SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
+ Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
- lastTopicMetadataRefreshTime = SystemTime.milliseconds
+ lastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds
}
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index d423757..79ed1b8 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -17,12 +17,14 @@
package kafka.producer.async
-import kafka.utils.{SystemTime, Logging}
-import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
+import kafka.utils.Logging
+import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
+
import collection.mutable.ArrayBuffer
import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.Time
@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0")
class ProducerSendThread[K,V](val threadName: String,
@@ -59,15 +61,15 @@ class ProducerSendThread[K,V](val threadName: String,
}
private def processEvents() {
- var lastSend = SystemTime.milliseconds
+ var lastSend = Time.SYSTEM.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
- Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
+ Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
- val elapsed = (SystemTime.milliseconds - lastSend)
+ val elapsed = Time.SYSTEM.milliseconds - lastSend
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
@@ -87,7 +89,7 @@ class ProducerSendThread[K,V](val threadName: String,
debug("Batch full. Sending..")
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
- lastSend = SystemTime.milliseconds
+ lastSend = Time.SYSTEM.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 5248edf..dbee092 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -27,8 +27,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.kafka.common.utils.Utils
-
import scala.collection._
import com.yammer.metrics.core.Gauge
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 2e9e714..e0e6a03 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -18,18 +18,13 @@
package kafka.server
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.security.auth.Resource
import kafka.utils.Json
import kafka.utils.Logging
-import kafka.utils.SystemTime
-import kafka.utils.Time
import kafka.utils.ZkUtils
-import org.apache.zookeeper.Watcher.Event.KeeperState
import scala.collection._
import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
-
+import org.apache.kafka.common.utils.Time
/**
* Represents all the entities that can be configured via ZK
@@ -87,7 +82,7 @@ object ConfigEntityName {
class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
- private val time: Time = SystemTime) extends Logging {
+ private val time: Time = Time.SYSTEM) extends Logging {
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 296beb3..fa3db5c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,19 +30,19 @@ import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
-import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
+import kafka.message.{ByteBufferMessageSet, Message}
import kafka.network._
import kafka.network.RequestChannel.{Response, Session}
import kafka.security.auth
import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
-import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.requests.SaslHandshakeResponse
@@ -64,7 +64,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
- val clusterId: String) extends Logging {
+ val clusterId: String,
+ time: Time) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
@@ -117,7 +118,7 @@ class KafkaApis(val requestChannel: RequestChannel,
error("Error when handling request %s".format(request.body), e)
}
} finally
- request.apiLocalCompleteTimeMs = SystemTime.milliseconds
+ request.apiLocalCompleteTimeMs = time.milliseconds
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
@@ -308,7 +309,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
// - If v2 we use the default expiration timestamp
- val currentTimestamp = SystemTime.milliseconds
+ val currentTimestamp = time.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedTopics.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
@@ -407,7 +408,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+ request.apiRemoteCompleteTimeMs = time.milliseconds
quotas.produce.recordAndMaybeThrottle(
request.session.sanitizedUser,
@@ -515,7 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// When this callback is triggered, the remote API call has completed
- request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
+ request.apiRemoteCompleteTimeMs = time.milliseconds
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
@@ -727,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel,
for (i <- segsArray.indices)
offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
if (lastSegmentHasSize)
- offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
+ offsetTimeArray(segsArray.length) = (log.logEndOffset, time.milliseconds)
var startIndex = -1
timestamp match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index f70955d..97145b4 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -21,8 +21,9 @@ import kafka.network._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
+
import com.yammer.metrics.core.Meter
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
/**
* A thread that answers kafka requests.
@@ -32,7 +33,8 @@ class KafkaRequestHandler(id: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int,
val requestChannel: RequestChannel,
- apis: KafkaApis) extends Runnable with Logging {
+ apis: KafkaApis,
+ time: Time) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
def run() {
@@ -44,9 +46,9 @@ class KafkaRequestHandler(id: Int,
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
- val startSelectTime = SystemTime.nanoseconds
+ val startSelectTime = time.nanoseconds
req = requestChannel.receiveRequest(300)
- val idleTime = SystemTime.nanoseconds - startSelectTime
+ val idleTime = time.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
@@ -55,7 +57,7 @@ class KafkaRequestHandler(id: Int,
id, brokerId))
return
}
- req.requestDequeueTimeMs = SystemTime.milliseconds
+ req.requestDequeueTimeMs = time.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)
} catch {
@@ -70,6 +72,7 @@ class KafkaRequestHandler(id: Int,
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
+ time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
@@ -79,7 +82,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {
- runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
+ runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4026a7e..bbddfae 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.AppInfoParser
+import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{ClusterResource, Node}
import scala.collection.JavaConverters._
@@ -86,7 +86,7 @@ object KafkaServer {
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
* to start up and shutdown a single Kafka node.
*/
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
+class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
@@ -97,10 +97,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
reporters.add(new JmxReporter(jmxPrefix))
- // This exists because the Metrics package from clients has its own Time implementation.
- // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them
- // Eventually, we want to merge the Time objects in core and clients
- private implicit val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime()
var metrics: Metrics = null
private val metricConfig: MetricConfig = new MetricConfig()
@@ -180,7 +176,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
- metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
+ metrics = new Metrics(metricConfig, reporters, time, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
brokerState.newState(Starting)
@@ -207,22 +203,23 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
metadataCache = new MetadataCache(config.brokerId)
- socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
+ socketServer = new SocketServer(config, metrics, time)
socketServer.startup()
/* start replica manager */
- replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
+ replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()
/* start kafka controller */
- kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
+ kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
kafkaController.startup()
adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
/* start group coordinator */
- groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
+ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+ groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
@@ -234,9 +231,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
- kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId)
+ kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+ clusterId, time)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+ requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
+ config.numIoThreads)
Mx4jLoader.maybeLoad()
@@ -363,7 +362,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
- kafkaMetricsTime,
+ time,
"kafka-server-controlled-shutdown",
Map.empty.asJava,
false,
@@ -378,7 +377,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
- kafkaMetricsTime)
+ time)
}
var shutdownSucceeded: Boolean = false
@@ -420,15 +419,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
if (prevController != null) {
try {
- if (!networkClient.blockingReady(node(prevController), socketTimeoutMs))
+ if (!networkClient.blockingReady(node(prevController), socketTimeoutMs)(time))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
val controlledShutdownRequest = new ControlledShutdownRequest(config.brokerId)
- val request = new ClientRequest(node(prevController).idString, kafkaMetricsTime.milliseconds(), true,
+ val request = new ClientRequest(node(prevController).idString, time.milliseconds(), true,
requestHeader, controlledShutdownRequest, null)
- val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)
+ val clientResponse = networkClient.blockingSendAndReceive(request, controlledShutdownRequest)(time)
val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d2ec200..af64ffe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -36,8 +36,8 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.FetchRequest.PartitionData
-import org.apache.kafka.common.utils.{Time => JTime}
import scala.collection._
import scala.collection.JavaConverters._
@@ -102,7 +102,6 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig,
metrics: Metrics,
time: Time,
- jTime: JTime,
val zkUtils: ZkUtils,
scheduler: Scheduler,
val logManager: LogManager,
@@ -116,7 +115,7 @@ class ReplicaManager(val config: KafkaConfig,
new Partition(t, p, time, this)
})
private val replicaStateChangeLock = new Object
- val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix, quotaManager)
+ val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
@@ -326,9 +325,9 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
if (isValidRequiredAcks(requiredAcks)) {
- val sTime = SystemTime.milliseconds
+ val sTime = time.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
- debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
+ debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 73e7210..d9e2b5b 100755
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -18,12 +18,13 @@ package kafka.server
import kafka.utils.ZkUtils._
import kafka.utils.CoreUtils._
-import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral}
+import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
import kafka.controller.KafkaController
import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.utils.Time
/**
* This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -35,7 +36,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
electionPath: String,
onBecomingLeader: () => Unit,
onResigningAsLeader: () => Unit,
- brokerId: Int)
+ brokerId: Int,
+ time: Time)
extends LeaderElector with Logging {
var leaderId = -1
// create the election path in ZK, if one does not exist
@@ -59,7 +61,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
}
def elect: Boolean = {
- val timestamp = SystemTime.milliseconds.toString
+ val timestamp = time.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
leaderId = getControllerID
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 01d3aa8..479b43c 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -19,18 +19,22 @@ package kafka.tools
import joptsimple.OptionParser
import kafka.cluster.BrokerEndPoint
-import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet}
+import kafka.message.{ByteBufferMessageSet, MessageAndOffset, MessageSet}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
+
import kafka.client.ClientUtils
-import java.util.regex.{PatternSyntaxException, Pattern}
+import java.util.regex.{Pattern, PatternSyntaxException}
+
import kafka.api._
import java.text.SimpleDateFormat
import java.util.Date
+
import kafka.common.TopicAndPartition
import kafka.utils._
-import kafka.consumer.{ConsumerConfig, Whitelist, SimpleConsumer}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist}
import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
/**
* For verifying the consistency among replicas.
@@ -59,7 +63,7 @@ object ReplicaVerificationTool extends Logging {
val dateFormat = new SimpleDateFormat(dateFormatString)
def getCurrentTimeString() = {
- ReplicaVerificationTool.dateFormat.format(new Date(SystemTime.milliseconds))
+ ReplicaVerificationTool.dateFormat.format(new Date(Time.SYSTEM.milliseconds))
}
def main(args: Array[String]): Unit = {
@@ -210,7 +214,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]]
private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers))
private val verificationBarrier = new AtomicReference(new CountDownLatch(1))
- @volatile private var lastReportTime = SystemTime.milliseconds
+ @volatile private var lastReportTime = Time.SYSTEM.milliseconds
private var maxLag: Long = -1L
private var offsetWithMaxLag: Long = -1L
private var maxLagTopicAndPartition: TopicAndPartition = null
@@ -331,7 +335,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
}
fetchResponsePerReplica.clear()
}
- val currentTimeMs = SystemTime.milliseconds
+ val currentTimeMs = Time.SYSTEM.milliseconds
if (currentTimeMs - lastReportTime > reportInterval) {
println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is "
+ maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag
http://git-wip-us.apache.org/repos/asf/kafka/blob/128d0ff9/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 3abbc40..69b6ee8 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -19,11 +19,13 @@ package kafka.tools
import java.net.URI
import java.text.SimpleDateFormat
-import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.apache.log4j.Logger
import kafka.common.TopicAndPartition
+import org.apache.kafka.common.utils.Time
/**
@@ -96,7 +98,7 @@ object SimpleConsumerPerformance {
(totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed,
totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed))
}
- lastReportTime = SystemTime.milliseconds
+ lastReportTime = Time.SYSTEM.milliseconds
lastBytesRead = totalBytesRead
lastMessagesRead = totalMessagesRead
consumedInterval = 0