You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/05/16 01:57:59 UTC
git commit: KAFKA-1445 Send all partitions, regardless of how full,
whenever we are sending a request to a broker. Patch from Guozhang.
Repository: kafka
Updated Branches:
refs/heads/trunk b866c5506 -> 99f10739b
KAFKA-1445 Send all partitions, regardless of how full, whenever we are sending a request to a broker. Patch from Guozhang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99f10739
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99f10739
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99f10739
Branch: refs/heads/trunk
Commit: 99f10739b5921534dddc6e3773e4f8a05b8909b0
Parents: b866c55
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu May 15 16:56:45 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu May 15 16:56:45 2014 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../kafka/clients/producer/MockProducer.java | 4 +-
.../clients/producer/internals/Metadata.java | 2 +-
.../clients/producer/internals/Partitioner.java | 4 +-
.../producer/internals/RecordAccumulator.java | 125 ++++++++++---------
.../clients/producer/internals/Sender.java | 52 +++-----
.../java/org/apache/kafka/common/Cluster.java | 46 +++++--
.../clients/producer/RecordAccumulatorTest.java | 63 ++++++----
.../common/utils/AbstractIteratorTest.java | 9 +-
9 files changed, 167 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a6423f4..90cacbd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -283,7 +283,7 @@ public class KafkaProducer implements Producer {
}
public List<PartitionInfo> partitionsFor(String topic) {
- return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
+ return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 6a0f3b2..c0f1d57 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -102,7 +102,7 @@ public class MockProducer implements Producer {
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
int partition = 0;
- if (this.cluster.partitionsFor(record.topic()) != null)
+ if (this.cluster.partitionsForTopic(record.topic()) != null)
partition = partitioner.partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
@@ -133,7 +133,7 @@ public class MockProducer implements Producer {
}
public List<PartitionInfo> partitionsFor(String topic) {
- return this.cluster.partitionsFor(topic);
+ return this.cluster.partitionsForTopic(topic);
}
public Map<String, Metric> metrics() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index f114ffd..f47a461 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -81,7 +81,7 @@ public final class Metadata {
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
do {
- partitions = cluster.partitionsFor(topic);
+ partitions = cluster.partitionsForTopic(topic);
if (partitions == null) {
topics.add(topic);
forceUpdate = true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index fbb732a..40e8234 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -41,10 +41,10 @@ public class Partitioner {
* Compute the partition for the given record.
*
* @param record The record being sent
- * @param numPartitions The total number of partitions for the given topic
+ * @param cluster The current cluster metadata
*/
public int partition(ProducerRecord record, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsFor(record.topic());
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
if (record.partition() != null) {
// they have given us a partition, use it
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2d7e52d..5ededcc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -13,15 +13,13 @@
package org.apache.kafka.clients.producer.internals;
import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -111,11 +109,6 @@ public final class RecordAccumulator {
return free.availableMemory();
}
});
- metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
- public double measure(MetricConfig config, long nowMs) {
- return ready(nowMs).size();
- }
- });
}
/**
@@ -180,9 +173,10 @@ public final class RecordAccumulator {
}
/**
- * Get a list of topic-partitions which are ready to be sent.
+ * Get a list of nodes whose partitions are ready to be sent.
* <p>
- * A partition is ready if ANY of the following are true:
+ * A destination node is ready to send data if ANY one of its partition is not backing off the send
+ * and ANY of the following are true :
* <ol>
* <li>The record set is full
* <li>The record set has sat in the accumulator for at least lingerMs milliseconds
@@ -191,24 +185,31 @@ public final class RecordAccumulator {
* <li>The accumulator has been closed
* </ol>
*/
- public List<TopicPartition> ready(long nowMs) {
- List<TopicPartition> ready = new ArrayList<TopicPartition>();
+ public Set<Node> ready(Cluster cluster, long nowMs) {
+ Set<Node> readyNodes = new HashSet<Node>();
boolean exhausted = this.free.queued() > 0;
+
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+ TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
- synchronized (deque) {
- RecordBatch batch = deque.peekFirst();
- if (batch != null) {
- boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
- boolean full = deque.size() > 1 || batch.records.isFull();
- boolean expired = nowMs - batch.createdMs >= lingerMs;
- boolean sendable = full || expired || exhausted || closed;
- if (sendable && !backingOff)
- ready.add(batch.topicPartition);
+ // if the leader is unknown use an Unknown node placeholder
+ Node leader = cluster.leaderFor(part);
+ if (!readyNodes.contains(leader)) {
+ synchronized (deque) {
+ RecordBatch batch = deque.peekFirst();
+ if (batch != null) {
+ boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
+ boolean full = deque.size() > 1 || batch.records.isFull();
+ boolean expired = nowMs - batch.createdMs >= lingerMs;
+ boolean sendable = full || expired || exhausted || closed;
+ if (sendable && !backingOff)
+ readyNodes.add(leader);
+ }
}
}
}
- return ready;
+
+ return readyNodes;
}
/**
@@ -226,45 +227,55 @@ public final class RecordAccumulator {
}
/**
- * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
- * to avoid choosing the same topic-partitions over and over.
+ * Drain all the data for the given nodes and collate them into a list of
+ * batches that will fit within the specified size on a per-node basis.
+ * This method attempts to avoid choosing the same topic-node over and over.
*
- * @param partitions The list of partitions to drain
+ * @param cluster The current cluster metadata
+ * @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* @param nowMs The current unix time in milliseconds
- * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
+ * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
* TODO: There may be a starvation issue due to iteration order
*/
- public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long nowMs) {
- if (partitions.isEmpty())
- return Collections.emptyList();
- int size = 0;
- List<RecordBatch> ready = new ArrayList<RecordBatch>();
- /* to make starvation less likely this loop doesn't start at 0 */
- int start = drainIndex = drainIndex % partitions.size();
- do {
- TopicPartition tp = partitions.get(drainIndex);
- Deque<RecordBatch> deque = dequeFor(tp);
- if (deque != null) {
- synchronized (deque) {
- RecordBatch first = deque.peekFirst();
- if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
- // there is a rare case that a single batch size is larger than the request size due
- // to compression; in this case we will still eventually send this batch in a single
- // request
- return ready;
- } else {
- RecordBatch batch = deque.pollFirst();
- batch.records.close();
- size += batch.records.sizeInBytes();
- ready.add(batch);
- batch.drainedMs = nowMs;
+ public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) {
+ if (nodes.isEmpty())
+ return Collections.emptyMap();
+
+ Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
+ for (Node node : nodes) {
+ int size = 0;
+ List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+ List<RecordBatch> ready = new ArrayList<RecordBatch>();
+ /* to make starvation less likely this loop doesn't start at 0 */
+ int start = drainIndex = drainIndex % parts.size();
+ do {
+ PartitionInfo part = parts.get(drainIndex);
+ Deque<RecordBatch> deque = dequeFor(new TopicPartition(part.topic(), part.partition()));
+ if (deque != null) {
+ synchronized (deque) {
+ RecordBatch first = deque.peekFirst();
+ if (first != null) {
+ if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
+ // there is a rare case that a single batch size is larger than the request size due
+ // to compression; in this case we will still eventually send this batch in a single
+ // request
+ break;
+ } else {
+ RecordBatch batch = deque.pollFirst();
+ batch.records.close();
+ size += batch.records.sizeInBytes();
+ ready.add(batch);
+ batch.drainedMs = nowMs;
+ }
+ }
}
}
- }
- this.drainIndex = (this.drainIndex + 1) % partitions.size();
- } while (start != drainIndex);
- return ready;
+ this.drainIndex = (this.drainIndex + 1) % parts.size();
+ } while (start != drainIndex);
+ batches.put(node.id(), ready);
+ }
+ return batches;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f0152fa..3e83ae0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -15,15 +15,7 @@ package org.apache.kafka.clients.producer.internals;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -192,18 +184,18 @@ public class Sender implements Runnable {
public void run(long nowMs) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
- List<TopicPartition> ready = this.accumulator.ready(nowMs);
+ Set<Node> ready = this.accumulator.ready(cluster, nowMs);
// should we update our metadata?
List<NetworkSend> sends = new ArrayList<NetworkSend>();
maybeUpdateMetadata(cluster, sends, nowMs);
- // prune the list of ready topics to eliminate any that we aren't ready to send yet
- List<TopicPartition> sendable = processReadyPartitions(cluster, ready, nowMs);
+ // prune the list of ready nodes to eliminate any that we aren't ready to send yet
+ Set<Node> sendable = processReadyNode(ready, nowMs);
// create produce requests
- List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs);
- List<InFlightRequest> requests = collate(cluster, batches, nowMs);
+ Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs);
+ List<InFlightRequest> requests = generateProduceRequests(batches, nowMs);
sensors.updateProduceRequestMetrics(requests);
if (ready.size() > 0) {
@@ -310,19 +302,21 @@ public class Sender implements Runnable {
}
/**
- * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
- * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
- * metadata to be able to do so
+ * Process the set of destination nodes with data ready to send.
+ *
+ * 1) If we have an unknown leader node, force refresh the metadata.
+ * 2) If we have a connection to the appropriate node, add
+ * it to the returned set;
+ * 3) If we have not a connection yet, initialize one
*/
- private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long nowMs) {
- List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
- for (TopicPartition tp : ready) {
- Node node = cluster.leaderFor(tp);
+ private Set<Node> processReadyNode(Set<Node> ready, long nowMs) {
+ Set<Node> sendable = new HashSet<Node>(ready.size());
+ for (Node node : ready) {
if (node == null) {
// we don't know about this topic/partition or it has no leader, re-fetch metadata
metadata.forceUpdate();
} else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
- sendable.add(tp);
+ sendable.add(node);
} else if (nodeStates.canConnect(node.id(), nowMs)) {
// we don't have a connection to this node right now, make one
initiateConnect(node, nowMs);
@@ -520,19 +514,9 @@ public class Sender implements Runnable {
}
/**
- * Collate the record batches into a list of produce requests on a per-node basis
+ * Transfer the record batches into a list of produce requests on a per-node basis
*/
- private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long nowMs) {
- Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
- for (RecordBatch batch : batches) {
- Node node = cluster.leaderFor(batch.topicPartition);
- List<RecordBatch> found = collated.get(node.id());
- if (found == null) {
- found = new ArrayList<RecordBatch>();
- collated.put(node.id(), found);
- }
- found.add(batch);
- }
+ private List<InFlightRequest> generateProduceRequests(Map<Integer, List<RecordBatch>> collated, long nowMs) {
List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 426bd1e..c62707a 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -12,6 +12,8 @@
*/
package org.apache.kafka.common;
+import org.apache.kafka.common.utils.Utils;
+
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -28,6 +30,7 @@ public final class Cluster {
private final List<Node> nodes;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
+ private final Map<Integer, List<PartitionInfo>> partitionsByNode;
/**
* Create a new cluster with the given nodes and partitions
@@ -45,18 +48,32 @@ public final class Cluster {
for (PartitionInfo p : partitions)
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
- // index the partitions by topic and make the lists unmodifiable so we can handle them out in
- // user-facing apis without risk of the client modifying the contents
- HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
+ // index the partitions by topic and node respectively, and make the lists
+ // unmodifiable so we can hand them out in user-facing apis without risk
+ // of the client modifying the contents
+ HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
+ HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
+ for (Node n : this.nodes) {
+ partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+ }
for (PartitionInfo p : partitions) {
- if (!parts.containsKey(p.topic()))
- parts.put(p.topic(), new ArrayList<PartitionInfo>());
- List<PartitionInfo> ps = parts.get(p.topic());
- ps.add(p);
+ if (!partsForTopic.containsKey(p.topic()))
+ partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+ List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
+ psTopic.add(p);
+
+ if (p.leader() != null) {
+ List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
+ psNode.add(p);
+ }
}
- this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
- for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
+ this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+ for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet())
this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
+ for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
+ this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+
}
/**
@@ -113,10 +130,19 @@ public final class Cluster {
* @param topic The topic name
* @return A list of partitions
*/
- public List<PartitionInfo> partitionsFor(String topic) {
+ public List<PartitionInfo> partitionsForTopic(String topic) {
return this.partitionsByTopic.get(topic);
}
+ /**
+ * Get the list of partitions whose leader is this node
+ * @param nodeId The node id
+ * @return A list of partitions
+ */
+ public List<PartitionInfo> partitionsForNode(int nodeId) {
+ return this.partitionsByNode.get(nodeId);
+ }
+
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index f37ab77..c4072ae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -17,12 +17,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.RecordBatch;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
@@ -34,11 +35,19 @@ import org.junit.Test;
public class RecordAccumulatorTest {
- private TopicPartition tp = new TopicPartition("test", 0);
+ private String topic = "test";
+ private int partition1 = 0;
+ private int partition2 = 1;
+ private Node node = new Node(0, "localhost", 1111);
+ private TopicPartition tp1 = new TopicPartition(topic, partition1);
+ private TopicPartition tp2 = new TopicPartition(topic, partition2);
+ private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null);
+ private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null);
private MockTime time = new MockTime();
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+ private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2));
private Metrics metrics = new Metrics(time);
@Test
@@ -47,12 +56,12 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) {
- accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
+ accum.append(tp1, key, value, CompressionType.NONE, null);
+ assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size());
}
- accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
- List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
+ accum.append(tp1, key, value, CompressionType.NONE, null);
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -68,19 +77,19 @@ public class RecordAccumulatorTest {
public void testAppendLarge() throws Exception {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
- accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null);
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
}
@Test
public void testLinger() throws Exception {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
- accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
+ accum.append(tp1, key, value, CompressionType.NONE, null);
+ assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size());
time.sleep(10);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
- List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Iterator<LogEntry> iter = batch.records.iterator();
@@ -94,14 +103,14 @@ public class RecordAccumulatorTest {
public void testPartialDrain() throws Exception {
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
int appends = 1024 / msgSize + 1;
- List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
+ List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) {
for (int i = 0; i < appends; i++)
accum.append(tp, key, value, CompressionType.NONE, null);
}
- assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
+ assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()));
- List<RecordBatch> batches = accum.drain(partitions, 1024, 0);
+ List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id());
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
}
@@ -109,7 +118,7 @@ public class RecordAccumulatorTest {
public void testStressfulSituation() throws Exception {
final int numThreads = 5;
final int msgs = 10000;
- final int numParts = 10;
+ final int numParts = 2;
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < numThreads; i++) {
@@ -117,7 +126,7 @@ public class RecordAccumulatorTest {
public void run() {
for (int i = 0; i < msgs; i++) {
try {
- accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null);
+ accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null);
} catch (Exception e) {
e.printStackTrace();
}
@@ -130,12 +139,14 @@ public class RecordAccumulatorTest {
int read = 0;
long now = time.milliseconds();
while (read < numThreads * msgs) {
- List<TopicPartition> tps = accum.ready(now);
- List<RecordBatch> batches = accum.drain(tps, 5 * 1024, 0);
- for (RecordBatch batch : batches) {
- for (LogEntry entry : batch.records)
- read++;
- accum.deallocate(batch);
+ Set<Node> nodes = accum.ready(cluster, now);
+ List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id());
+ if (batches != null) {
+ for (RecordBatch batch : batches) {
+ for (LogEntry entry : batch.records)
+ read++;
+ accum.deallocate(batch);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99f10739/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index 1df2266..c788e66 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
-import org.apache.kafka.common.utils.AbstractIterator;
import org.junit.Test;
public class AbstractIteratorTest {
@@ -49,7 +44,7 @@ public class AbstractIteratorTest {
@Test(expected = NoSuchElementException.class)
public void testEmptyIterator() {
- Iterator<Object> iter = new ListIterator<Object>(Arrays.asList());
+ Iterator<Object> iter = new ListIterator<Object>(Collections.emptyList());
iter.next();
}