You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/05/16 01:57:59 UTC

git commit: KAFKA-1445 Send all partitions, regardless of how full, whenever we are sending a request to a broker. Patch from Guozhang.

Repository: kafka
Updated Branches:
  refs/heads/trunk b866c5506 -> 99f10739b


KAFKA-1445 Send all partitions, regardless of how full, whenever we are sending a request to a broker. Patch from Guozhang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99f10739
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99f10739
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99f10739

Branch: refs/heads/trunk
Commit: 99f10739b5921534dddc6e3773e4f8a05b8909b0
Parents: b866c55
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu May 15 16:56:45 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu May 15 16:56:45 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   2 +-
 .../kafka/clients/producer/MockProducer.java    |   4 +-
 .../clients/producer/internals/Metadata.java    |   2 +-
 .../clients/producer/internals/Partitioner.java |   4 +-
 .../producer/internals/RecordAccumulator.java   | 125 ++++++++++---------
 .../clients/producer/internals/Sender.java      |  52 +++-----
 .../java/org/apache/kafka/common/Cluster.java   |  46 +++++--
 .../clients/producer/RecordAccumulatorTest.java |  63 ++++++----
 .../common/utils/AbstractIteratorTest.java      |   9 +-
 9 files changed, 167 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a6423f4..90cacbd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -283,7 +283,7 @@ public class KafkaProducer implements Producer {
     }
 
     public List<PartitionInfo> partitionsFor(String topic) {
-        return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
+        return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 6a0f3b2..c0f1d57 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -102,7 +102,7 @@ public class MockProducer implements Producer {
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
         int partition = 0;
-        if (this.cluster.partitionsFor(record.topic()) != null)
+        if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partitioner.partition(record, this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
         FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
@@ -133,7 +133,7 @@ public class MockProducer implements Producer {
     }
 
     public List<PartitionInfo> partitionsFor(String topic) {
-        return this.cluster.partitionsFor(topic);
+        return this.cluster.partitionsForTopic(topic);
     }
 
     public Map<String, Metric> metrics() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index f114ffd..f47a461 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -81,7 +81,7 @@ public final class Metadata {
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
         do {
-            partitions = cluster.partitionsFor(topic);
+            partitions = cluster.partitionsForTopic(topic);
             if (partitions == null) {
                 topics.add(topic);
                 forceUpdate = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index fbb732a..40e8234 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -41,10 +41,10 @@ public class Partitioner {
      * Compute the partition for the given record.
      * 
      * @param record The record being sent
-     * @param numPartitions The total number of partitions for the given topic
+     * @param cluster The current cluster metadata
      */
     public int partition(ProducerRecord record, Cluster cluster) {
-        List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
+        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
         int numPartitions = partitions.size();
         if (record.partition() != null) {
             // they have given us a partition, use it

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2d7e52d..5ededcc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -13,15 +13,13 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -111,11 +109,6 @@ public final class RecordAccumulator {
                                   return free.availableMemory();
                               }
                           });
-        metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
-            public double measure(MetricConfig config, long nowMs) {
-                return ready(nowMs).size();
-            }
-        });
     }
 
     /**
@@ -180,9 +173,10 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Get a list of topic-partitions which are ready to be sent.
+     * Get a list of nodes whose partitions are ready to be sent.
      * <p>
-     * A partition is ready if ANY of the following are true:
+     * A destination node is ready to send data if ANY one of its partition is not backing off the send
+     * and ANY of the following are true :
      * <ol>
      * <li>The record set is full
      * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
@@ -191,24 +185,31 @@ public final class RecordAccumulator {
      * <li>The accumulator has been closed
      * </ol>
      */
-    public List<TopicPartition> ready(long nowMs) {
-        List<TopicPartition> ready = new ArrayList<TopicPartition>();
+    public Set<Node> ready(Cluster cluster, long nowMs) {
+        Set<Node> readyNodes = new HashSet<Node>();
         boolean exhausted = this.free.queued() > 0;
+
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+            TopicPartition part = entry.getKey();
             Deque<RecordBatch> deque = entry.getValue();
-            synchronized (deque) {
-                RecordBatch batch = deque.peekFirst();
-                if (batch != null) {
-                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
-                    boolean full = deque.size() > 1 || batch.records.isFull();
-                    boolean expired = nowMs - batch.createdMs >= lingerMs;
-                    boolean sendable = full || expired || exhausted || closed;
-                    if (sendable && !backingOff)
-                        ready.add(batch.topicPartition);
+            // if the leader is unknown use an Unknown node placeholder
+            Node leader = cluster.leaderFor(part);
+            if (!readyNodes.contains(leader)) {
+                synchronized (deque) {
+                    RecordBatch batch = deque.peekFirst();
+                    if (batch != null) {
+                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
+                        boolean full = deque.size() > 1 || batch.records.isFull();
+                        boolean expired = nowMs - batch.createdMs >= lingerMs;
+                        boolean sendable = full || expired || exhausted || closed;
+                        if (sendable && !backingOff)
+                            readyNodes.add(leader);
+                    }
                 }
             }
         }
-        return ready;
+
+        return readyNodes;
     }
 
     /**
@@ -226,45 +227,55 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
-     * to avoid choosing the same topic-partitions over and over.
+     * Drain all the data for the given nodes and collate them into a list of
+     * batches that will fit within the specified size on a per-node basis.
+     * This method attempts to avoid choosing the same topic-node over and over.
      * 
-     * @param partitions The list of partitions to drain
+     * @param cluster The current cluster metadata
+     * @param nodes The list of node to drain
      * @param maxSize The maximum number of bytes to drain
      * @param nowMs The current unix time in milliseconds
-     * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
+     * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
      *         TODO: There may be a starvation issue due to iteration order
      */
-    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long nowMs) {
-        if (partitions.isEmpty())
-            return Collections.emptyList();
-        int size = 0;
-        List<RecordBatch> ready = new ArrayList<RecordBatch>();
-        /* to make starvation less likely this loop doesn't start at 0 */
-        int start = drainIndex = drainIndex % partitions.size();
-        do {
-            TopicPartition tp = partitions.get(drainIndex);
-            Deque<RecordBatch> deque = dequeFor(tp);
-            if (deque != null) {
-                synchronized (deque) {
-                    RecordBatch first = deque.peekFirst();
-                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
-                        // there is a rare case that a single batch size is larger than the request size due
-                        // to compression; in this case we will still eventually send this batch in a single
-                        // request
-                        return ready;
-                    } else {
-                        RecordBatch batch = deque.pollFirst();
-                        batch.records.close();
-                        size += batch.records.sizeInBytes();
-                        ready.add(batch);
-                        batch.drainedMs = nowMs;
+    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) {
+        if (nodes.isEmpty())
+            return Collections.emptyMap();
+
+        Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
+        for (Node node : nodes) {
+            int size = 0;
+            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+            List<RecordBatch> ready = new ArrayList<RecordBatch>();
+            /* to make starvation less likely this loop doesn't start at 0 */
+            int start = drainIndex = drainIndex % parts.size();
+            do {
+                PartitionInfo part = parts.get(drainIndex);
+                Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition()));
+                if (deque != null) {
+                    synchronized (deque) {
+                        RecordBatch first = deque.peekFirst();
+                        if (first != null) {
+                            if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+                                // there is a rare case that a single batch size is larger than the request size due
+                                // to compression; in this case we will still eventually send this batch in a single
+                                // request
+                                break;
+                            } else {
+                                RecordBatch batch = deque.pollFirst();
+                                batch.records.close();
+                                size += batch.records.sizeInBytes();
+                                ready.add(batch);
+                                batch.drainedMs = nowMs;
+                            }
+                        }
                     }
                 }
-            }
-            this.drainIndex = (this.drainIndex + 1) % partitions.size();
-        } while (start != drainIndex);
-        return ready;
+                this.drainIndex = (this.drainIndex + 1) % parts.size();
+            } while (start != drainIndex);
+            batches.put(node.id(), ready);
+        }
+        return batches;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f0152fa..3e83ae0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -15,15 +15,7 @@ package org.apache.kafka.clients.producer.internals;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -192,18 +184,18 @@ public class Sender implements Runnable {
     public void run(long nowMs) {
         Cluster cluster = metadata.fetch();
         // get the list of partitions with data ready to send
-        List<TopicPartition> ready = this.accumulator.ready(nowMs);
+        Set<Node> ready = this.accumulator.ready(cluster, nowMs);
 
         // should we update our metadata?
         List<NetworkSend> sends = new ArrayList<NetworkSend>();
         maybeUpdateMetadata(cluster, sends, nowMs);
 
-        // prune the list of ready topics to eliminate any that we aren't ready to send yet
-        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, nowMs);
+        // prune the list of ready nodes to eliminate any that we aren't ready to send yet
+        Set<Node> sendable = processReadyNode(ready, nowMs);
 
         // create produce requests
-        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs);
-        List<InFlightRequest> requests = collate(cluster, batches, nowMs);
+        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs);
+        List<InFlightRequest> requests = generateProduceRequests(batches, nowMs);
         sensors.updateProduceRequestMetrics(requests);
 
         if (ready.size() > 0) {
@@ -310,19 +302,21 @@ public class Sender implements Runnable {
     }
 
     /**
-     * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
-     * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
-     * metadata to be able to do so
+     * Process the set of destination nodes with data ready to send.
+     *
+     * 1) If we have an unknown leader node, force refresh the metadata.
+     * 2) If we have a connection to the appropriate node, add
+     *    it to the returned set;
+     * 3) If we have not a connection yet, initialize one
      */
-    private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long nowMs) {
-        List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
-        for (TopicPartition tp : ready) {
-            Node node = cluster.leaderFor(tp);
+    private Set<Node> processReadyNode(Set<Node> ready, long nowMs) {
+        Set<Node> sendable = new HashSet<Node>(ready.size());
+        for (Node node : ready) {
             if (node == null) {
                 // we don't know about this topic/partition or it has no leader, re-fetch metadata
                 metadata.forceUpdate();
             } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
-                sendable.add(tp);
+                sendable.add(node);
             } else if (nodeStates.canConnect(node.id(), nowMs)) {
                 // we don't have a connection to this node right now, make one
                 initiateConnect(node, nowMs);
@@ -520,19 +514,9 @@ public class Sender implements Runnable {
     }
 
     /**
-     * Collate the record batches into a list of produce requests on a per-node basis
+     * Transfer the record batches into a list of produce requests on a per-node basis
      */
-    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long nowMs) {
-        Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
-        for (RecordBatch batch : batches) {
-            Node node = cluster.leaderFor(batch.topicPartition);
-            List<RecordBatch> found = collated.get(node.id());
-            if (found == null) {
-                found = new ArrayList<RecordBatch>();
-                collated.put(node.id(), found);
-            }
-            found.add(batch);
-        }
+    private List<InFlightRequest> generateProduceRequests(Map<Integer, List<RecordBatch>> collated, long nowMs) {
         List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
         for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
             requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 426bd1e..c62707a 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -12,6 +12,8 @@
  */
 package org.apache.kafka.common;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -28,6 +30,7 @@ public final class Cluster {
     private final List<Node> nodes;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
+    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
 
     /**
      * Create a new cluster with the given nodes and partitions
@@ -45,18 +48,32 @@ public final class Cluster {
         for (PartitionInfo p : partitions)
             this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
 
-        // index the partitions by topic and make the lists unmodifiable so we can handle them out in
-        // user-facing apis without risk of the client modifying the contents
-        HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
+        // index the partitions by topic and node respectively, and make the lists
+        // unmodifiable so we can hand them out in user-facing apis without risk
+        // of the client modifying the contents
+        HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
+        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
+        for (Node n : this.nodes) {
+            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+        }
         for (PartitionInfo p : partitions) {
-            if (!parts.containsKey(p.topic()))
-                parts.put(p.topic(), new ArrayList<PartitionInfo>());
-            List<PartitionInfo> ps = parts.get(p.topic());
-            ps.add(p);
+            if (!partsForTopic.containsKey(p.topic()))
+                partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+            List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
+            psTopic.add(p);
+
+            if (p.leader() != null) {
+                List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
+                psNode.add(p);
+            }
         }
-        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
-        for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
+        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+        for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
             this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+        this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
+        for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
+            this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+
     }
 
     /**
@@ -113,10 +130,19 @@ public final class Cluster {
      * @param topic The topic name
      * @return A list of partitions
      */
-    public List<PartitionInfo> partitionsFor(String topic) {
+    public List<PartitionInfo> partitionsForTopic(String topic) {
         return this.partitionsByTopic.get(topic);
     }
 
+    /**
+     * Get the list of partitions whose leader is this node
+     * @param nodeId The node id
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> partitionsForNode(int nodeId) {
+        return this.partitionsByNode.get(nodeId);
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index f37ab77..c4072ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -17,12 +17,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.RecordBatch;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
@@ -34,11 +35,19 @@ import org.junit.Test;
 
 public class RecordAccumulatorTest {
 
-    private TopicPartition tp = new TopicPartition("test", 0);
+    private String topic = "test";
+    private int partition1 = 0;
+    private int partition2 = 1;
+    private Node node = new Node(0, "localhost", 1111);
+    private TopicPartition tp1 = new TopicPartition(topic, partition1);
+    private TopicPartition tp2 = new TopicPartition(topic, partition2);
+    private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null);
+    private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null);
     private MockTime time = new MockTime();
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+    private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2));
     private Metrics metrics = new Metrics(time);
 
     @Test
@@ -47,12 +56,12 @@ public class RecordAccumulatorTest {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
-            accum.append(tp, key, value, CompressionType.NONE, null);
-            assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
+            accum.append(tp1, key, value, CompressionType.NONE, null);
+            assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size());
         }
-        accum.append(tp, key, value, CompressionType.NONE, null);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
+        accum.append(tp1, key, value, CompressionType.NONE, null);
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -68,19 +77,19 @@ public class RecordAccumulatorTest {
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
-        accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+        accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
     }
 
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
-        accum.append(tp, key, value, CompressionType.NONE, null);
-        assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
+        accum.append(tp1, key, value, CompressionType.NONE, null);
+        assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size());
         time.sleep(10);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
+        assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -94,14 +103,14 @@ public class RecordAccumulatorTest {
     public void testPartialDrain() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
         int appends = 1024 / msgSize + 1;
-        List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
+        List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
             for (int i = 0; i < appends; i++)
                 accum.append(tp, key, value, CompressionType.NONE, null);
         }
-        assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
+        assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
 
-        List<RecordBatch> batches = accum.drain(partitions, 1024, 0);
+        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
     }
 
@@ -109,7 +118,7 @@ public class RecordAccumulatorTest {
     public void testStressfulSituation() throws Exception {
         final int numThreads = 5;
         final int msgs = 10000;
-        final int numParts = 10;
+        final int numParts = 2;
         final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
@@ -117,7 +126,7 @@ public class RecordAccumulatorTest {
                 public void run() {
                     for (int i = 0; i < msgs; i++) {
                         try {
-                            accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null);
+                            accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null);
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -130,12 +139,14 @@ public class RecordAccumulatorTest {
         int read = 0;
         long now = time.milliseconds();
         while (read < numThreads * msgs) {
-            List<TopicPartition> tps = accum.ready(now);
-            List<RecordBatch> batches = accum.drain(tps, 5 * 1024, 0);
-            for (RecordBatch batch : batches) {
-                for (LogEntry entry : batch.records)
-                    read++;
-                accum.deallocate(batch);
+            Set<Node> nodes = accum.ready(cluster, now);
+            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
+            if (batches != null) {
+                for (RecordBatch batch : batches) {
+                    for (LogEntry entry : batch.records)
+                        read++;
+                    accum.deallocate(batch);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index 1df2266..c788e66 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
 
-import org.apache.kafka.common.utils.AbstractIterator;
 import org.junit.Test;
 
 public class AbstractIteratorTest {
@@ -49,7 +44,7 @@ public class AbstractIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void testEmptyIterator() {
-        Iterator<Object> iter = new ListIterator<Object>(Arrays.asList());
+        Iterator<Object> iter = new ListIterator<Object>(Collections.emptyList());
         iter.next();
     }