You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/07/01 22:23:10 UTC

git commit: KAFKA-1498 Misc. producer performance enhancements. Patch from Guozhang.

Repository: kafka
Updated Branches:
  refs/heads/trunk c4b95641e -> f1c6e97d7


KAFKA-1498 Misc. producer performance enhancements. Patch from Guozhang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1c6e97d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1c6e97d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1c6e97d

Branch: refs/heads/trunk
Commit: f1c6e97d718581566d037a48640ac3d869d1f15a
Parents: c4b9564
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Jul 1 13:21:36 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Jul 1 13:21:36 2014 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 20 +++---
 .../kafka/clients/producer/KafkaProducer.java   | 16 +++--
 .../clients/producer/internals/Metadata.java    | 15 ++--
 .../producer/internals/RecordAccumulator.java   | 76 ++++++++++++++------
 .../clients/producer/internals/Sender.java      | 26 ++++---
 .../kafka/common/record/MemoryRecords.java      | 26 +++++--
 .../apache/kafka/clients/NetworkClientTest.java |  2 +-
 .../kafka/clients/producer/MetadataTest.java    | 10 +--
 .../clients/producer/RecordAccumulatorTest.java | 14 ++--
 .../kafka/clients/producer/SenderTest.java      |  6 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 63 ++++++++--------
 .../kafka/api/ProducerFailureHandlingTest.scala | 25 ++++---
 .../unit/kafka/network/SocketServerTest.scala   |  8 ++-
 13 files changed, 187 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 522881c..d21f922 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient {
 
     /**
      * Check if the node with the given id is ready to send more requests.
-     * @param nodeId The node id
+     * @param node The given node id
      * @param now The current time in ms
      * @return true if the node is ready
      */
@@ -126,8 +126,8 @@ public class NetworkClient implements KafkaClient {
     }
 
     private boolean isReady(int node, long now) {
-        if (this.metadata.needsUpdate(now))
-            // if we need to update our metadata declare all requests unready to metadata requests first priority
+        if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
+            // if we need to update our metadata now declare all requests unready to make metadata requests first priority
             return false;
         else
             // otherwise we are ready if we are connected and can send more requests
@@ -144,9 +144,12 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) {
-        // should we update our metadata?
         List<NetworkSend> sends = new ArrayList<NetworkSend>();
-        maybeUpdateMetadata(sends, now);
+
+        // should we update our metadata?
+        long metadataTimeout = metadata.timeToNextUpdate(now);
+        if (!this.metadataFetchInProgress && metadataTimeout == 0)
+            maybeUpdateMetadata(sends, now);
 
         for (int i = 0; i < requests.size(); i++) {
             ClientRequest request = requests.get(i);
@@ -160,7 +163,7 @@ public class NetworkClient implements KafkaClient {
 
         // do the I/O
         try {
-            this.selector.poll(timeout, sends);
+            this.selector.poll(Math.min(timeout, metadataTimeout), sends);
         } catch (IOException e) {
             log.error("Unexpected error during I/O in producer network thread", e);
         }
@@ -340,12 +343,9 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Add a metadata request to the list of sends if we need to make one
+     * Add a metadata request to the list of sends if we can make one
      */
     private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
-        if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
-            return;
-
         Node node = this.leastLoadedNode(now);
         if (node == null)
             return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 00775ab..d85ca30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -22,7 +22,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -232,10 +231,14 @@ public class KafkaProducer implements Producer {
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
-            this.sender.wakeup();
-            return future;
-            // For API exceptions return them in the future;
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
+            if (result.batchIsFull || result.newBatchCreated) {
+                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
+                this.sender.wakeup();
+            }
+            return result.future;
+            // Handling exceptions and record the errors;
+            // For API exceptions return them in the future,
             // for other exceptions throw directly
         } catch (ApiException e) {
             log.debug("Exception occurred during message send:", e);
@@ -246,6 +249,9 @@ public class KafkaProducer implements Producer {
         } catch (InterruptedException e) {
             this.errors.record();
             throw new KafkaException(e);
+        } catch (KafkaException e) {
+            this.errors.record();
+            throw e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 57bc285..8890aa2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -101,15 +101,14 @@ public final class Metadata {
     }
 
     /**
-     * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs
-     * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
-     * than metadataExpireMs has passed since the last refresh)
+     * The next time to update the cluster info is the maximum of the time the current info will expire
+     * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has
+     * been request then the expiry time is now
      */
-    public synchronized boolean needsUpdate(long now) {
-        long msSinceLastUpdate = now - this.lastRefreshMs;
-        boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
-        boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
-        return updateAllowed && updateNeeded;
+    public synchronized long timeToNextUpdate(long nowMs) {
+        long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+        return Math.max(timeToExpire, timeToAllowUpdate);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1ed3c28..c5d4700 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -120,27 +120,29 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Add a record to the accumulator.
+     * Add a record to the accumulator, return the append result
      * <p>
-     * This method will block if sufficient memory isn't available for the record unless blocking has been disabled.
-     * 
+     * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
+     * <p>
+     *
      * @param tp The topic/partition to which this record is being sent
      * @param key The key for the record
      * @param value The value for the record
      * @param compression The compression codec for the record
      * @param callback The user-supplied callback to execute when the request is complete
      */
-    public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
         if (closed)
             throw new IllegalStateException("Cannot send after the producer is closed.");
         // check if we have an in-progress batch
         Deque<RecordBatch> dq = dequeFor(tp);
         synchronized (dq) {
-            RecordBatch batch = dq.peekLast();
-            if (batch != null) {
-                FutureRecordMetadata future = batch.tryAppend(key, value, callback);
-                if (future != null)
-                    return future;
+            RecordBatch last = dq.peekLast();
+            if (last != null) {
+                FutureRecordMetadata future = last.tryAppend(key, value, callback);
+                if (future != null) {
+                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
+                }
             }
         }
 
@@ -156,15 +158,15 @@ public final class RecordAccumulator {
                     // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                     // often...
                     free.deallocate(buffer);
-                    return future;
+                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                 }
             }
-            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression);
+            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
             RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
             FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback));
 
             dq.addLast(batch);
-            return future;
+            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
         }
     }
 
@@ -181,7 +183,9 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Get a list of nodes whose partitions are ready to be sent.
+     * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no
+     * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag
+     * for whether there are any unknown leaders for the accumulated partition batches.
      * <p>
      * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the
      * following are true :
@@ -193,31 +197,39 @@ public final class RecordAccumulator {
      * <li>The accumulator has been closed
      * </ol>
      */
-    public Set<Node> ready(Cluster cluster, long now) {
+    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
         Set<Node> readyNodes = new HashSet<Node>();
-        boolean exhausted = this.free.queued() > 0;
+        long nextReadyCheckDelayMs = Long.MAX_VALUE;
+        boolean unknownLeadersExist = false;
 
+        boolean exhausted = this.free.queued() > 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
             TopicPartition part = entry.getKey();
             Deque<RecordBatch> deque = entry.getValue();
-            // if the leader is unknown use an Unknown node placeholder
+
             Node leader = cluster.leaderFor(part);
-            if (!readyNodes.contains(leader)) {
+            if (leader == null) {
+                unknownLeadersExist = true;
+            } else if (!readyNodes.contains(leader)) {
                 synchronized (deque) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
-                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > now;
+                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
+                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
+                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                         boolean full = deque.size() > 1 || batch.records.isFull();
-                        boolean expired = now - batch.createdMs >= lingerMs;
+                        boolean expired = waitedTimeMs >= lingerMs;
                         boolean sendable = full || expired || exhausted || closed;
                         if (sendable && !backingOff)
                             readyNodes.add(leader);
+                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                     }
                 }
             }
         }
 
-        return readyNodes;
+        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
     }
 
     /**
@@ -311,4 +323,28 @@ public final class RecordAccumulator {
         this.closed = true;
     }
 
+
+    public final static class RecordAppendResult {
+        public final FutureRecordMetadata future;
+        public final boolean batchIsFull;
+        public final boolean newBatchCreated;
+
+        public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
+            this.future = future;
+            this.batchIsFull = batchIsFull;
+            this.newBatchCreated = newBatchCreated;
+        }
+    }
+
+    public final static class ReadyCheckResult {
+        public final Set<Node> readyNodes;
+        public final long nextReadyCheckDelayMs;
+        public final boolean unknownLeadersExist;
+
+        public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
+            this.readyNodes = readyNodes;
+            this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
+            this.unknownLeadersExist = unknownLeadersExist;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6fb5b82..52d209b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -17,7 +17,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
@@ -123,13 +122,13 @@ public class Sender implements Runnable {
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
         // wait until these are completed.
-        do {
+        while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0) {
             try {
                 run(time.milliseconds());
             } catch (Exception e) {
                 log.error("Uncaught error in kafka producer I/O thread: ", e);
             }
-        } while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0);
+        }
 
         this.client.close();
 
@@ -144,10 +143,14 @@ public class Sender implements Runnable {
     public void run(long now) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
-        Set<Node> ready = this.accumulator.ready(cluster, now);
+        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
+
+        // if there are any partitions whose leaders are not known yet, force metadata update
+        if (result.unknownLeadersExist)
+            this.metadata.forceUpdate();
 
         // remove any nodes we aren't ready to send to
-        Iterator<Node> iter = ready.iterator();
+        Iterator<Node> iter = result.readyNodes.iterator();
         while (iter.hasNext()) {
             Node node = iter.next();
             if (!this.client.ready(node, now))
@@ -155,16 +158,20 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, ready, this.maxRequestSize, now);
+        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
         List<ClientRequest> requests = createProduceRequests(batches, now);
         sensors.updateProduceRequestMetrics(requests);
 
-        if (ready.size() > 0) {
-            log.trace("Nodes with data ready to send: {}", ready);
+        if (result.readyNodes.size() > 0) {
+            log.trace("Nodes with data ready to send: {}", result.readyNodes);
             log.trace("Created {} produce requests: {}", requests.size(), requests);
         }
 
-        List<ClientResponse> responses = this.client.poll(requests, 100L, now);
+        // if some partitions are already ready to be sent, the select time would be 0;
+        // otherwise if some partition already has some data accumulated but not ready yet,
+        // the select time will be the time difference between now and its linger expiry time;
+        // otherwise the select time will be the time difference between now and the metadata expiry time;
+        List<ClientResponse> responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now);
         for (ClientResponse response : responses) {
             if (response.wasDisconnected())
                 handleDisconnect(response, now);
@@ -307,6 +314,7 @@ public class Sender implements Runnable {
 
             this.batchSizeSensor = metrics.sensor("batch-size");
             this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+            this.batchSizeSensor.add("batch-size-max", "The max number of bytes sent per partition per-request.", new Max());
 
             this.compressionRateSensor = metrics.sensor("compression-rate");
             this.compressionRateSensor.add("compression-rate-avg", "The average compression rate of record batches.", new Avg());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 759f577..040e5b9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -29,13 +29,15 @@ public class MemoryRecords implements Records {
 
     private final Compressor compressor;
     private final int capacity;
+    private final int sizeLimit;
     private ByteBuffer buffer;
     private boolean writable;
 
     // Construct a writable memory records
-    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable) {
+    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int sizeLimit) {
         this.writable = writable;
         this.capacity = buffer.capacity();
+        this.sizeLimit = sizeLimit;
         if (this.writable) {
             this.buffer = null;
             this.compressor = new Compressor(buffer, type);
@@ -45,12 +47,16 @@ public class MemoryRecords implements Records {
         }
     }
 
+    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int capacity) {
+        return new MemoryRecords(buffer, type, true, capacity);
+    }
+
     public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type) {
-        return new MemoryRecords(buffer, type, true);
+        return emptyRecords(buffer, type, buffer.capacity());
     }
 
     public static MemoryRecords iterableRecords(ByteBuffer buffer) {
-        return new MemoryRecords(buffer, CompressionType.NONE, false);
+        return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
     }
 
     /**
@@ -88,14 +94,22 @@ public class MemoryRecords implements Records {
      * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
      * accurate if compression is really used. When this happens, the following append may cause dynamic buffer
      * re-allocation in the underlying byte buffer stream.
+     *
+     * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
+     * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
+     * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has
+     * exceed this limit we also mark this record as full.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
-                                                 Record.recordSize(key, value);
+        return this.writable &&
+            this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) &&
+            this.sizeLimit >= this.compressor.estimatedBytesWritten();
     }
 
     public boolean isFull() {
-        return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten();
+        return !this.writable ||
+            this.capacity <= this.compressor.estimatedBytesWritten() ||
+            this.sizeLimit <= this.compressor.estimatedBytesWritten();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 6a3cdcc..2f98192 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -54,7 +54,7 @@ public class NetworkClientTest {
         client.poll(reqs, 1, time.milliseconds());
         selector.clear();
         assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
-        assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds()));
+        assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
     }
 
     @Test(expected = IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 8b4ac0f..0d7d04c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -30,11 +30,11 @@ public class MetadataTest {
     public void testMetadata() throws Exception {
         long time = 0;
         metadata.update(Cluster.empty(), time);
-        assertFalse("No update needed.", metadata.needsUpdate(time));
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.forceUpdate();
-        assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time));
+        assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
         time += refreshBackoffMs;
-        assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time));
+        assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
         String topic = "my-topic";
         Thread t1 = asyncFetch(topic);
         Thread t2 = asyncFetch(topic);
@@ -43,9 +43,9 @@ public class MetadataTest {
         metadata.update(TestUtils.singletonCluster(topic, 1), time);
         t1.join();
         t2.join();
-        assertFalse("No update needed.", metadata.needsUpdate(time));
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         time += metadataExpireMs;
-        assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
+        assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
     }
 
     private Thread asyncFetch(final String topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index 93b58d0..0762b35 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -62,10 +62,10 @@ public class RecordAccumulatorTest {
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, key, value, CompressionType.NONE, null);
-            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size());
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
         }
         accum.append(tp1, key, value, CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
@@ -83,7 +83,7 @@ public class RecordAccumulatorTest {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
         accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
 
     @Test
@@ -91,9 +91,9 @@ public class RecordAccumulatorTest {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
         accum.append(tp1, key, value, CompressionType.NONE, null);
-        assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size());
+        assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
-        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
@@ -113,7 +113,7 @@ public class RecordAccumulatorTest {
             for (int i = 0; i < appends; i++)
                 accum.append(tp, key, value, CompressionType.NONE, null);
         }
-        assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes);
 
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
@@ -144,7 +144,7 @@ public class RecordAccumulatorTest {
         int read = 0;
         long now = time.milliseconds();
         while (read < numThreads * msgs) {
-            Set<Node> nodes = accum.ready(cluster, now);
+            Set<Node> nodes = accum.ready(cluster, now).readyNodes;
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 5489aca..ef2ca65 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -69,7 +69,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         int offset = 0;
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
@@ -95,7 +95,7 @@ public class SenderTest {
                                    new Metrics(),
                                    time);
         // do a successful retry
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals(1, client.inFlightRequestCount());
@@ -112,7 +112,7 @@ public class SenderTest {
         assertEquals(offset, future.get().offset());
 
         // do an unsuccessful retry
-        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future;
         sender.run(time.milliseconds()); // send produce request
         for (int i = 0; i < maxRetries + 1; i++) {
             client.disconnect(client.requests().peek().request().destination());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 7638391..4f06e34 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -21,22 +21,22 @@ import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging}
 import kafka.consumer._
 import kafka.serializer._
 import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer}
+import kafka.metrics.KafkaMetricsGroup
+
 import org.apache.kafka.clients.producer.ProducerRecord
 
-import scala.collection.mutable.ListBuffer
 import scala.collection.JavaConversions._
 
-import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
-
 import joptsimple.OptionParser
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
+import java.util.Random
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch}
 
 object MirrorMaker extends Logging {
 
   private var connectors: Seq[ZookeeperConsumerConnector] = null
   private var consumerThreads: Seq[ConsumerThread] = null
-  private var producerThreads: ListBuffer[ProducerThread] = null
+  private var producerThreads: Seq[ProducerThread] = null
 
   private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
 
@@ -138,13 +138,7 @@ object MirrorMaker extends Logging {
     // create a data channel btw the consumers and the producers
     val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers)
 
-    producerThreads = new ListBuffer[ProducerThread]()
-    var producerIndex: Int = 1
-    for(producer <- producers) {
-      val producerThread = new ProducerThread(mirrorDataChannel, producer, producerIndex)
-      producerThreads += producerThread
-      producerIndex += 1
-    }
+    producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2))
 
     val filterSpec = if (options.has(whitelistOpt))
       new Whitelist(options.valueOf(whitelistOpt))
@@ -190,14 +184,11 @@ object MirrorMaker extends Logging {
 
   class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup {
 
-    val queue = new ArrayBlockingQueue[ProducerRecord](capacity)
+    val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers)
+    for (i <- 0 until numConsumers)
+      queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
 
-    newGauge(
-      "MirrorMaker-DataChannel-Size",
-      new Gauge[Int] {
-        def value = queue.size
-      }
-    )
+    private val counter = new AtomicInteger(new Random().nextInt())
 
     // We use a single meter for aggregated wait percentage for the data channel.
     // Since meter is calculated as total_recorded_value / time_window and
@@ -205,23 +196,37 @@ object MirrorMaker extends Logging {
     // time should be discounted by # threads.
     private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS)
     private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
+    private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
 
     def put(record: ProducerRecord) {
+      // If the key of the message is empty, use round-robin to select the queue
+      // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
+      val queueId =
+        if(record.key() != null) {
+          Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers
+        } else {
+          Utils.abs(counter.getAndIncrement()) % numConsumers
+        }
+      val queue = queues(queueId)
+
       var putSucceed = false
       while (!putSucceed) {
         val startPutTime = SystemTime.nanoseconds
         putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS)
         waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers)
       }
+      channelSizeHist.update(queue.size)
     }
 
-    def take(): ProducerRecord = {
+    def take(queueId: Int): ProducerRecord = {
+      val queue = queues(queueId)
       var data: ProducerRecord = null
       while (data == null) {
         val startTakeTime = SystemTime.nanoseconds
         data = queue.poll(500, TimeUnit.MILLISECONDS)
         waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers)
       }
+      channelSizeHist.update(queue.size)
       data
     }
   }
@@ -242,18 +247,8 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker consumer thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
-          // If the key of the message is empty, put it into the universal channel
-          // Otherwise use a pre-assigned producer to send the message
-          if (msgAndMetadata.key == null) {
-            trace("Send the non-keyed message the producer channel.")
-            val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message)
-            mirrorDataChannel.put(data)
-          } else {
-            val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size()
-            trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId))
-            val producer = producers(producerId)
-            producer.send(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
-          }
+          val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message)
+          mirrorDataChannel.put(data)
         }
       } catch {
         case e: Throwable =>
@@ -287,7 +282,7 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker producer thread " + threadName)
       try {
         while (true) {
-          val data: ProducerRecord = dataChannel.take
+          val data: ProducerRecord = dataChannel.take(threadId)
           trace("Sending message with value size %d".format(data.value().size))
           if(data eq shutdownMessage) {
             info("Received shutdown message")

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index d146444..15fd5bc 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,7 +21,7 @@ import org.scalatest.junit.JUnit3Suite
 import org.junit.Test
 import org.junit.Assert._
 
-import java.util.Properties
+import java.util.{Random, Properties}
 import java.lang.Integer
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
@@ -76,8 +76,6 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize);
     producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
     producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize)
-    // producer with incorrect broker list
-    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
   }
 
   override def tearDown() {
@@ -150,6 +148,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     // create topic
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
+    // producer with incorrect broker list
+    producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize)
+
     // send a record with incorrect broker list
     val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
     intercept[ExecutionException] {
@@ -168,28 +169,32 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // first send a message to make sure the metadata is refreshed
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
-    producer1.send(record).get
-    producer2.send(record).get
+    val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    producer1.send(record1).get
+    producer2.send(record1).get
 
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
     server1.requestHandlerPool.shutdown()
     server2.requestHandlerPool.shutdown()
 
-    producer1.send(record).get(5000, TimeUnit.MILLISECONDS)
+    producer1.send(record1).get(5000, TimeUnit.MILLISECONDS)
 
     intercept[TimeoutException] {
-      producer2.send(record).get(5000, TimeUnit.MILLISECONDS)
+      producer2.send(record1).get(5000, TimeUnit.MILLISECONDS)
     }
 
     // TODO: expose producer configs after creating them
     // send enough messages to get buffer full
-    val tooManyRecords = bufferSize / ("key".getBytes.length + "value".getBytes.length)
+    val msgSize = 10000
+    val value = new Array[Byte](msgSize)
+    new Random().nextBytes(value)
+    val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
+    val tooManyRecords = bufferSize / ("key".getBytes.length + value.length)
 
     intercept[KafkaException] {
       for (i <- 1 to tooManyRecords)
-        producer2.send(record)
+        producer2.send(record2)
     }
 
     // do not close produce2 since it will block

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1c6e97d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 62fb02c..1c492de 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -128,11 +128,15 @@ class SocketServerTest extends JUnitSuite {
 
   @Test(expected = classOf[IOException])
   def testSocketsCloseOnShutdown() {
-    // open a connection and then shutdown the server
+    // open a connection
     val socket = connect()
+    val bytes = new Array[Byte](40)
+    // send a request first to make sure the connection has been picked up by the socket server
+    sendRequest(socket, 0, bytes)
+    processRequest(server.requestChannel)
+    // then shutdown the server
     server.shutdown()
     // doing a subsequent send should throw an exception as the connection should be closed.
-    val bytes = new Array[Byte](10)
     sendRequest(socket, 0, bytes)
   }
 }