You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/12 04:52:14 UTC

git commit: KAFKA-1257 Only send metadata requests to nodes with no in-flight requests.

Updated Branches:
  refs/heads/trunk cdd03d199 -> e1845ba1d


KAFKA-1257 Only send metadata requests to nodes with no in-flight requests.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1845ba1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1845ba1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1845ba1

Branch: refs/heads/trunk
Commit: e1845ba1d80f0ba89d01d450c066aae90b416ab1
Parents: cdd03d1
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Feb 11 17:03:36 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 11 19:51:43 2014 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1845ba1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e8c194c..87dd1a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -126,17 +126,17 @@ public class Sender implements Runnable {
         // get the list of partitions with data ready to send
         List<TopicPartition> ready = this.accumulator.ready(now);
 
-        // prune the list of ready topics to eliminate any that we aren't ready to send yet
-        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
-
         // should we update our metadata?
-        List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
+        List<NetworkSend> sends = new ArrayList<NetworkSend>();
         InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
         if (metadataReq != null) {
             sends.add(metadataReq.request);
             this.inFlightRequests.add(metadataReq);
         }
 
+        // prune the list of ready topics to eliminate any that we aren't ready to send yet
+        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
+
         // create produce requests
         List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
         List<InFlightRequest> requests = collate(cluster, batches);
@@ -165,7 +165,11 @@ public class Sender implements Runnable {
     private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
         if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
             return null;
-        Node node = cluster.nextNode();
+
+        Node node = nextFreeNode(cluster);
+        if (node == null)
+            return null;
+
         NodeState state = nodeState.get(node.id());
         if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
             // we don't have a connection to this node right now, make one
@@ -180,6 +184,18 @@ public class Sender implements Runnable {
     }
 
     /**
+     * @return A node with no requests currently being sent or null if no such node exists
+     */
+    private Node nextFreeNode(Cluster cluster) {
+        for (int i = 0; i < cluster.nodes().size(); i++) {
+            Node node = cluster.nextNode();
+            if (this.inFlightRequests.canSendMore(node.id()))
+                return node;
+        }
+        return null;
+    }
+
+    /**
      * Start closing the sender (won't actually complete until all data is sent out)
      */
     public void initiateClose() {