You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/07/01 22:23:10 UTC
git commit: KAFKA-1498 Misc. producer performance enhancements. Patch
from Guozhang.
Repository: kafka
Updated Branches:
refs/heads/trunk c4b95641e -> f1c6e97d7
KAFKA-1498 Misc. producer performance enhancements. Patch from Guozhang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1c6e97d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1c6e97d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1c6e97d
Branch: refs/heads/trunk
Commit: f1c6e97d718581566d037a48640ac3d869d1f15a
Parents: c4b9564
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Jul 1 13:21:36 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Jul 1 13:21:36 2014 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 20 +++---
.../kafka/clients/producer/KafkaProducer.java | 16 +++--
.../clients/producer/internals/Metadata.java | 15 ++--
.../producer/internals/RecordAccumulator.java | 76 ++++++++++++++------
.../clients/producer/internals/Sender.java | 26 ++++---
.../kafka/common/record/MemoryRecords.java | 26 +++++--
.../apache/kafka/clients/NetworkClientTest.java | 2 +-
.../kafka/clients/producer/MetadataTest.java | 10 +--
.../clients/producer/RecordAccumulatorTest.java | 14 ++--
.../kafka/clients/producer/SenderTest.java | 6 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 63 ++++++++--------
.../kafka/api/ProducerFailureHandlingTest.scala | 25 ++++---
.../unit/kafka/network/SocketServerTest.scala | 8 ++-
13 files changed, 187 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 522881c..d21f922 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient {
/**
* Check if the node with the given id is ready to send more requests.
- * @param nodeId The node id
+ * @param node The given node id
* @param now The current time in ms
* @return true if the node is ready
*/
@@ -126,8 +126,8 @@ public class NetworkClient implements KafkaClient {
}
private boolean isReady(int node, long now) {
- if (this.metadata.needsUpdate(now))
- // if we need to update our metadata declare all requests unready to metadata requests first priority
+ if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
+ // if we need to update our metadata now declare all requests unready to make metadata requests first priority
return false;
else
// otherwise we are ready if we are connected and can send more requests
@@ -144,9 +144,12 @@ public class NetworkClient implements KafkaClient {
*/
@Override
public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
- // should we update our metadata?
List<NetworkSend> sends = new ArrayList<NetworkSend>();
- maybeUpdateMetadata(sends, now);
+
+ // should we update our metadata?
+ long metadataTimeout = metadata.timeToNextUpdate(now);
+ if (!this.metadataFetchInProgress && metadataTimeout == 0)
+ maybeUpdateMetadata(sends, now);
for (int i = 0; i < requests.size(); i++) {
ClientRequest request = requests.get(i);
@@ -160,7 +163,7 @@ public class NetworkClient implements KafkaClient {
// do the I/O
try {
- this.selector.poll(timeout, sends);
+ this.selector.poll(Math.min(timeout, metadataTimeout), sends);
} catch (IOException e) {
log.error("Unexpected error during I/O in producer network thread", e);
}
@@ -340,12 +343,9 @@ public class NetworkClient implements KafkaClient {
}
/**
- * Add a metadata request to the list of sends if we need to make one
+ * Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
- if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
- return;
-
Node node = this.leastLoadedNode(now);
if (node == null)
return;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 00775ab..d85ca30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -22,7 +22,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -232,10 +231,14 @@ public class KafkaProducer implements Producer {
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
- FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
- this.sender.wakeup();
- return future;
- // For API exceptions return them in the future;
+ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
+ if (result.batchIsFull || result.newBatchCreated) {
+ log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
+ this.sender.wakeup();
+ }
+ return result.future;
+ // Handling exceptions and record the errors;
+ // For API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
@@ -246,6 +249,9 @@ public class KafkaProducer implements Producer {
} catch (InterruptedException e) {
this.errors.record();
throw new KafkaException(e);
+ } catch (KafkaException e) {
+ this.errors.record();
+ throw e;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 57bc285..8890aa2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -101,15 +101,14 @@ public final class Metadata {
}
/**
- * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs
- * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
- * than metadataExpireMs has passed since the last refresh)
+ * The next time to update the cluster info is the maximum of the time the current info will expire
+ * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has
+ * been request then the expiry time is now
*/
- public synchronized boolean needsUpdate(long now) {
- long msSinceLastUpdate = now - this.lastRefreshMs;
- boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
- boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
- return updateAllowed && updateNeeded;
+ public synchronized long timeToNextUpdate(long nowMs) {
+ long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+ long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+ return Math.max(timeToExpire, timeToAllowUpdate);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1ed3c28..c5d4700 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -120,27 +120,29 @@ public final class RecordAccumulator {
}
/**
- * Add a record to the accumulator.
+ * Add a record to the accumulator, return the append result
* <p>
- * This method will block if sufficient memory isn't available for the record unless blocking has been disabled.
- *
+ * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
+ * <p>
+ *
* @param tp The topic/partition to which this record is being sent
* @param key The key for the record
* @param value The value for the record
* @param compression The compression codec for the record
* @param callback The user-supplied callback to execute when the request is complete
*/
- public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+ public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// check if we have an in-progress batch
Deque<RecordBatch> dq = dequeFor(tp);
synchronized (dq) {
- RecordBatch batch = dq.peekLast();
- if (batch != null) {
- FutureRecordMetadata future = batch.tryAppend(key, value, callback);
- if (future != null)
- return future;
+ RecordBatch last = dq.peekLast();
+ if (last != null) {
+ FutureRecordMetadata future = last.tryAppend(key, value, callback);
+ if (future != null) {
+ return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+ }
}
}
@@ -156,15 +158,15 @@ public final class RecordAccumulator {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
// often...
free.deallocate(buffer);
- return future;
+ return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
}
- MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression);
+ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
dq.addLast(batch);
- return future;
+ return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
}
@@ -181,7 +183,9 @@ public final class RecordAccumulator {
}
/**
- * Get a list of nodes whose partitions are ready to be sent.
+ * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no
+ * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag
+ * for whether there are any unknown leaders for the accumulated partition batches.
* <p>
* A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the
* following are true :
@@ -193,31 +197,39 @@ public final class RecordAccumulator {
* <li>The accumulator has been closed
* </ol>
*/
- public Set<Node> ready(Cluster cluster, long now) {
+ public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
- boolean exhausted = this.free.queued() > 0;
+ long nextReadyCheckDelayMs = Long.MAX_VALUE;
+ boolean unknownLeadersExist = false;
+ boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
- // if the leader is unknown use an Unknown node placeholder
+
Node leader = cluster.leaderFor(part);
- if (!readyNodes.contains(leader)) {
+ if (leader == null) {
+ unknownLeadersExist = true;
+ } else if (!readyNodes.contains(leader)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
- boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > now;
+ boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
+ long waitedTimeMs = nowMs - batch.lastAttemptMs;
+ long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+ long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull();
- boolean expired = now - batch.createdMs >= lingerMs;
+ boolean expired = waitedTimeMs >= lingerMs;
boolean sendable = full || expired || exhausted || closed;
if (sendable && !backingOff)
readyNodes.add(leader);
+ nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
- return readyNodes;
+ return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
/**
@@ -311,4 +323,28 @@ public final class RecordAccumulator {
this.closed = true;
}
+
+ public final static class RecordAppendResult {
+ public final FutureRecordMetadata future;
+ public final boolean batchIsFull;
+ public final boolean newBatchCreated;
+
+ public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
+ this.future = future;
+ this.batchIsFull = batchIsFull;
+ this.newBatchCreated = newBatchCreated;
+ }
+ }
+
+ public final static class ReadyCheckResult {
+ public final Set<Node> readyNodes;
+ public final long nextReadyCheckDelayMs;
+ public final boolean unknownLeadersExist;
+
+ public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
+ this.readyNodes = readyNodes;
+ this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
+ this.unknownLeadersExist = unknownLeadersExist;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6fb5b82..52d209b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -17,7 +17,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
@@ -123,13 +122,13 @@ public class Sender implements Runnable {
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
- do {
+ while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
- } while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0);
+ }
this.client.close();
@@ -144,10 +143,14 @@ public class Sender implements Runnable {
public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
- Set<Node> ready = this.accumulator.ready(cluster, now);
+ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
+
+ // if there are any partitions whose leaders are not known yet, force metadata update
+ if (result.unknownLeadersExist)
+ this.metadata.forceUpdate();
// remove any nodes we aren't ready to send to
- Iterator<Node> iter = ready.iterator();
+ Iterator<Node> iter = result.readyNodes.iterator();
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now))
@@ -155,16 +158,20 @@ public class Sender implements Runnable {
}
// create produce requests
- Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now);
+ Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
List<ClientRequest> requests = createProduceRequests(batches, now);
sensors.updateProduceRequestMetrics(requests);
- if (ready.size() > 0) {
- log.trace("Nodes with data ready to send: {}", ready);
+ if (result.readyNodes.size() > 0) {
+ log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
}
- List<ClientResponse> responses = this.client.poll(requests, 100L, now);
+ // if some partitions are already ready to be sent, the select time would be 0;
+ // otherwise if some partition already has some data accumulated but not ready yet,
+ // the select time will be the time difference between now and its linger expiry time;
+ // otherwise the select time will be the time difference between now and the metadata expiry time;
+ List<ClientResponse> responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now);
for (ClientResponse response : responses) {
if (response.wasDisconnected())
handleDisconnect(response, now);
@@ -307,6 +314,7 @@ public class Sender implements Runnable {
this.batchSizeSensor = metrics.sensor("batch-size");
this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+ this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max());
this.compressionRateSensor = metrics.sensor("compression-rate");
this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 759f577..040e5b9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -29,13 +29,15 @@ public class MemoryRecords implements Records {
private final Compressor compressor;
private final int capacity;
+ private final int sizeLimit;
private ByteBuffer buffer;
private boolean writable;
// Construct a writable memory records
- private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) {
+ private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
this.writable = writable;
this.capacity = buffer.capacity();
+ this.sizeLimit = sizeLimit;
if (this.writable) {
this.buffer = null;
this.compressor = new Compressor(buffer, type);
@@ -45,12 +47,16 @@ public class MemoryRecords implements Records {
}
}
+ public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
+ return new MemoryRecords(buffer, type, true, capacity);
+ }
+
public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
- return new MemoryRecords(buffer, type, true);
+ return emptyRecords(buffer, type, buffer.capacity());
}
public static MemoryRecords iterableRecords(ByteBuffer buffer) {
- return new MemoryRecords(buffer, CompressionType.NONE, false);
+ return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
}
/**
@@ -88,14 +94,22 @@ public class MemoryRecords implements Records {
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is really used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
+ *
+ * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
+ * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
+ * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has
+ * exceed this limit we also mark this record as full.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
- Record.recordSize(key, value);
+ return this.writable &&
+ this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) &&
+ this.sizeLimit >= this.compressor.estimatedBytesWritten();
}
public boolean isFull() {
- return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten();
+ return !this.writable ||
+ this.capacity <= this.compressor.estimatedBytesWritten() ||
+ this.sizeLimit <= this.compressor.estimatedBytesWritten();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 6a3cdcc..2f98192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -54,7 +54,7 @@ public class NetworkClientTest {
client.poll(reqs, 1, time.milliseconds());
selector.clear();
assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
- assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds()));
+ assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
}
@Test(expected = IllegalStateException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 8b4ac0f..0d7d04c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -30,11 +30,11 @@ public class MetadataTest {
public void testMetadata() throws Exception {
long time = 0;
metadata.update(Cluster.empty(), time);
- assertFalse("No update needed.", metadata.needsUpdate(time));
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.forceUpdate();
- assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time));
+ assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
time += refreshBackoffMs;
- assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time));
+ assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
String topic = "my-topic";
Thread t1 = asyncFetch(topic);
Thread t2 = asyncFetch(topic);
@@ -43,9 +43,9 @@ public class MetadataTest {
metadata.update(TestUtils.singletonCluster(topic, 1), time);
t1.join();
t2.join();
- assertFalse("No update needed.", metadata.needsUpdate(time));
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
time += metadataExpireMs;
- assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
+ assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
}
private Thread asyncFetch(final String topic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 93b58d0..0762b35 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -62,10 +62,10 @@ public class RecordAccumulatorTest {
int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) {
accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size());
+ assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}
accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
@@ -83,7 +83,7 @@ public class RecordAccumulatorTest {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
}
@Test
@@ -91,9 +91,9 @@ public class RecordAccumulatorTest {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
accum.append(tp1, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size());
+ assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
time.sleep(10);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
@@ -113,7 +113,7 @@ public class RecordAccumulatorTest {
for (int i = 0; i < appends; i++)
accum.append(tp, key, value, CompressionType.NONE, null);
}
- assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
@@ -144,7 +144,7 @@ public class RecordAccumulatorTest {
int read = 0;
long now = time.milliseconds();
while (read < numThreads * msgs) {
- Set<Node> nodes = accum.ready(cluster, now);
+ Set<Node> nodes = accum.ready(cluster, now).readyNodes;
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
if (batches != null) {
for (RecordBatch batch : batches) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 5489aca..ef2ca65 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -69,7 +69,7 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
int offset = 0;
- Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -95,7 +95,7 @@ public class SenderTest {
new Metrics(),
time);
// do a successful retry
- Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
assertEquals(1, client.inFlightRequestCount());
@@ -112,7 +112,7 @@ public class SenderTest {
assertEquals(offset, future.get().offset());
// do an unsuccessful retry
- future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
sender.run(time.milliseconds()); // send produce request
for (int i = 0; i < maxRetries + 1; i++) {
client.disconnect(client.requests().peek().request().destination());
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 7638391..4f06e34 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -21,22 +21,22 @@ import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging}
import kafka.consumer._
import kafka.serializer._
import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
+import kafka.metrics.KafkaMetricsGroup
+
import org.apache.kafka.clients.producer.ProducerRecord
-import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._
-import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
-
import joptsimple.OptionParser
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
+import java.util.Random
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
object MirrorMaker extends Logging {
private var connectors: Seq[ZookeeperConsumerConnector] = null
private var consumerThreads: Seq[ConsumerThread] = null
- private var producerThreads: ListBuffer[ProducerThread] = null
+ private var producerThreads: Seq[ProducerThread] = null
private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
@@ -138,13 +138,7 @@ object MirrorMaker extends Logging {
// create a data channel btw the consumers and the producers
val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
- producerThreads = new ListBuffer[ProducerThread]()
- var producerIndex: Int = 1
- for(producer <- producers) {
- val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex)
- producerThreads += producerThread
- producerIndex += 1
- }
+ producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2))
val filterSpec = if (options.has(whitelistOpt))
new Whitelist(options.valueOf(whitelistOpt))
@@ -190,14 +184,11 @@ object MirrorMaker extends Logging {
class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
- val queue = new ArrayBlockingQueue[ProducerRecord](capacity)
+ val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers)
+ for (i <- 0 until numConsumers)
+ queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
- newGauge(
- "MirrorMaker-DataChannel-Size",
- new Gauge[Int] {
- def value = queue.size
- }
- )
+ private val counter = new AtomicInteger(new Random().nextInt())
// We use a single meter for aggregated wait percentage for the data channel.
// Since meter is calculated as total_recorded_value / time_window and
@@ -205,23 +196,37 @@ object MirrorMaker extends Logging {
// time should be discounted by # threads.
private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
+ private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
def put(record: ProducerRecord) {
+ // If the key of the message is empty, use round-robin to select the queue
+ // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
+ val queueId =
+ if(record.key() != null) {
+ Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers
+ } else {
+ Utils.abs(counter.getAndIncrement()) % numConsumers
+ }
+ val queue = queues(queueId)
+
var putSucceed = false
while (!putSucceed) {
val startPutTime = SystemTime.nanoseconds
putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
}
+ channelSizeHist.update(queue.size)
}
- def take(): ProducerRecord = {
+ def take(queueId: Int): ProducerRecord = {
+ val queue = queues(queueId)
var data: ProducerRecord = null
while (data == null) {
val startTakeTime = SystemTime.nanoseconds
data = queue.poll(500, TimeUnit.MILLISECONDS)
waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
}
+ channelSizeHist.update(queue.size)
data
}
}
@@ -242,18 +247,8 @@ object MirrorMaker extends Logging {
info("Starting mirror maker consumer thread " + threadName)
try {
for (msgAndMetadata <- stream) {
- // If the key of the message is empty, put it into the universal channel
- // Otherwise use a pre-assigned producer to send the message
- if (msgAndMetadata.key == null) {
- trace("Send the non-keyed message the producer channel.")
- val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message)
- mirrorDataChannel.put(data)
- } else {
- val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size()
- trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId))
- val producer = producers(producerId)
- producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
- }
+ val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message)
+ mirrorDataChannel.put(data)
}
} catch {
case e: Throwable =>
@@ -287,7 +282,7 @@ object MirrorMaker extends Logging {
info("Starting mirror maker producer thread " + threadName)
try {
while (true) {
- val data: ProducerRecord = dataChannel.take
+ val data: ProducerRecord = dataChannel.take(threadId)
trace("Sending message with value size %d".format(data.value().size))
if(data eq shutdownMessage) {
info("Received shutdown message")
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index d146444..15fd5bc 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,7 +21,7 @@ import org.scalatest.junit.JUnit3Suite
import org.junit.Test
import org.junit.Assert._
-import java.util.Properties
+import java.util.{Random, Properties}
import java.lang.Integer
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
@@ -76,8 +76,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize);
producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize)
- // producer with incorrect broker list
- producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
}
override def tearDown() {
@@ -150,6 +148,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
// create topic
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
+ // producer with incorrect broker list
+ producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
+
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
intercept[ExecutionException] {
@@ -168,28 +169,32 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
// first send a message to make sure the metadata is refreshed
- val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
- producer1.send(record).get
- producer2.send(record).get
+ val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+ producer1.send(record1).get
+ producer2.send(record1).get
// stop IO threads and request handling, but leave networking operational
// any requests should be accepted and queue up, but not handled
server1.requestHandlerPool.shutdown()
server2.requestHandlerPool.shutdown()
- producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
+ producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
intercept[TimeoutException] {
- producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
+ producer2.send(record1).get(5000, TimeUnit.MILLISECONDS)
}
// TODO: expose producer configs after creating them
// send enough messages to get buffer full
- val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
+ val msgSize = 10000
+ val value = new Array[Byte](msgSize)
+ new Random().nextBytes(value)
+ val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
+ val tooManyRecords = bufferSize / ("key".getBytes.length + value.length)
intercept[KafkaException] {
for (i <- 1 to tooManyRecords)
- producer2.send(record)
+ producer2.send(record2)
}
// do not close produce2 since it will block
http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 62fb02c..1c492de 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -128,11 +128,15 @@ class SocketServerTest extends JUnitSuite {
@Test(expected = classOf[IOException])
def testSocketsCloseOnShutdown() {
- // open a connection and then shutdown the server
+ // open a connection
val socket = connect()
+ val bytes = new Array[Byte](40)
+ // send a request first to make sure the connection has been picked up by the socket server
+ sendRequest(socket, 0, bytes)
+ processRequest(server.requestChannel)
+ // then shutdown the server
server.shutdown()
// doing a subsequent send should throw an exception as the connection should be closed.
- val bytes = new Array[Byte](10)
sendRequest(socket, 0, bytes)
}
}