You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/12 04:52:14 UTC
git commit: KAFKA-1257 Only send metadata requests to nodes with no
in-flight requests.
Updated Branches:
refs/heads/trunk cdd03d199 -> e1845ba1d
KAFKA-1257 Only send metadata requests to nodes with no in-flight requests.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1845ba1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1845ba1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1845ba1
Branch: refs/heads/trunk
Commit: e1845ba1d80f0ba89d01d450c066aae90b416ab1
Parents: cdd03d1
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Feb 11 17:03:36 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 11 19:51:43 2014 -0800
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 26 ++++++++++++++++----
1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1845ba1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e8c194c..87dd1a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -126,17 +126,17 @@ public class Sender implements Runnable {
// get the list of partitions with data ready to send
List<TopicPartition> ready = this.accumulator.ready(now);
- // prune the list of ready topics to eliminate any that we aren't ready to send yet
- List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
-
// should we update our metadata?
- List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
+ List<NetworkSend> sends = new ArrayList<NetworkSend>();
InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
if (metadataReq != null) {
sends.add(metadataReq.request);
this.inFlightRequests.add(metadataReq);
}
+ // prune the list of ready topics to eliminate any that we aren't ready to send yet
+ List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
+
// create produce requests
List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
List<InFlightRequest> requests = collate(cluster, batches);
@@ -165,7 +165,11 @@ public class Sender implements Runnable {
private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
return null;
- Node node = cluster.nextNode();
+
+ Node node = nextFreeNode(cluster);
+ if (node == null)
+ return null;
+
NodeState state = nodeState.get(node.id());
if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
// we don't have a connection to this node right now, make one
@@ -180,6 +184,18 @@ public class Sender implements Runnable {
}
/**
+ * @return A node with no requests currently being sent or null if no such node exists
+ */
+ private Node nextFreeNode(Cluster cluster) {
+ for (int i = 0; i < cluster.nodes().size(); i++) {
+ Node node = cluster.nextNode();
+ if (this.inFlightRequests.canSendMore(node.id()))
+ return node;
+ }
+ return null;
+ }
+
+ /**
* Start closing the sender (won't actually complete until all data is sent out)
*/
public void initiateClose() {