You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/11 05:59:41 UTC

[2/4] git commit: KAFKA-1238 Move metadata req/resp parsing into its own classes and avoid updating cluster metadata if there are no available nodes.

KAFKA-1238 Move metadata req/resp parsing into its own classes and avoid updating cluster metadata if there are no available nodes.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2ec321a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2ec321a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2ec321a

Branch: refs/heads/trunk
Commit: d2ec321a03e654552ee364f6572437f233b0c226
Parents: 962b547
Author: Jay Kreps <ja...@gmail.com>
Authored: Fri Feb 7 09:08:33 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sun Feb 9 15:20:33 2014 -0800

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      | 25 ++++---
 .../kafka/common/protocol/ProtoUtils.java       | 49 -------------
 .../kafka/common/requests/MetadataRequest.java  | 25 +++++++
 .../kafka/common/requests/MetadataResponse.java | 77 ++++++++++++++++++++
 4 files changed, 117 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b274e5e..3e10e32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -39,12 +39,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.Time;
 
-
 /**
  * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
  * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
@@ -300,8 +301,12 @@ public class Sender implements Runnable {
 
     private void handleMetadataResponse(Struct body, long now) {
         this.metadataFetchInProgress = false;
-        Cluster cluster = ProtoUtils.parseMetadataResponse(body);
-        this.metadata.update(cluster, now);
+        MetadataResponse response = new MetadataResponse(body);
+        Cluster cluster = response.cluster();
+        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
+        // created which means we will get errors and no nodes until it exists
+        if (cluster.nodes().size() > 0)
+            this.metadata.update(cluster, now);
     }
 
     /**
@@ -338,11 +343,8 @@ public class Sender implements Runnable {
      * Create a metadata request for the given topics
      */
     private InFlightRequest metadataRequest(int node, Set<String> topics) {
-        String[] ts = new String[topics.size()];
-        topics.toArray(ts);
-        Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
-        body.set("topics", topics.toArray());
-        RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body);
+        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
+        RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
         return new InFlightRequest(true, send, null);
     }
 
@@ -403,11 +405,14 @@ public class Sender implements Runnable {
         }
         produce.set("topic_data", topicDatas.toArray());
 
-        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++);
-        RequestSend send = new RequestSend(destination, header, produce);
+        RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
         return new InFlightRequest(acks != 0, send, batchesByPartition);
     }
 
+    private RequestHeader header(ApiKeys key) {
+        return new RequestHeader(key.id, clientId, correlation++);
+    }
+
     /**
      * Wake up the selector associated with this send thread
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index 90df5d5..c2cbbbd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -17,18 +17,10 @@
 package org.apache.kafka.common.protocol;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
-
 public class ProtoUtils {
 
     private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
@@ -70,45 +62,4 @@ public class ProtoUtils {
         return (Struct) currentResponseSchema(apiKey).read(buffer);
     }
 
-    public static Cluster parseMetadataResponse(Struct response) {
-        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
-        Object[] brokerStructs = (Object[]) response.get("brokers");
-        for (int i = 0; i < brokerStructs.length; i++) {
-            Struct broker = (Struct) brokerStructs[i];
-            int nodeId = (Integer) broker.get("node_id");
-            String host = (String) broker.get("host");
-            int port = (Integer) broker.get("port");
-            brokers.put(nodeId, new Node(nodeId, host, port));
-        }
-        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) response.get("topic_metadata");
-        for (int i = 0; i < topicInfos.length; i++) {
-            Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort("topic_error_code");
-            if (topicError == Errors.NONE.code()) {
-                String topic = topicInfo.getString("topic");
-                Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
-                for (int j = 0; j < partitionInfos.length; j++) {
-                    Struct partitionInfo = (Struct) partitionInfos[j];
-                    short partError = partitionInfo.getShort("partition_error_code");
-                    if (partError == Errors.NONE.code()) {
-                        int partition = partitionInfo.getInt("partition_id");
-                        int leader = partitionInfo.getInt("leader");
-                        Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                        Object[] replicas = (Object[]) partitionInfo.get("replicas");
-                        Node[] replicaNodes = new Node[replicas.length];
-                        for (int k = 0; k < replicas.length; k++)
-                            replicaNodes[k] = brokers.get(replicas[k]);
-                        Object[] isr = (Object[]) partitionInfo.get("isr");
-                        Node[] isrNodes = new Node[isr.length];
-                        for (int k = 0; k < isr.length; k++)
-                            isrNodes[k] = brokers.get(isr[k]);
-                        partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                    }
-                }
-            }
-        }
-        return new Cluster(brokers.values(), partitions);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
new file mode 100644
index 0000000..91b9d64
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -0,0 +1,25 @@
+package org.apache.kafka.common.requests;
+
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class MetadataRequest {
+
+    private final List<String> topics;
+
+    public MetadataRequest(List<String> topics) {
+        this.topics = topics;
+    }
+
+    public Struct toStruct() {
+        String[] ts = new String[topics.size()];
+        topics.toArray(ts);
+        Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
+        body.set("topics", topics.toArray());
+        return body;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
new file mode 100644
index 0000000..73b7006
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -0,0 +1,77 @@
+package org.apache.kafka.common.requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class MetadataResponse {
+
+    private final Cluster cluster;
+    private final Map<String, Errors> errors;
+
+    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+        this.cluster = cluster;
+        this.errors = errors;
+    }
+
+    public MetadataResponse(Struct struct) {
+        Map<String, Errors> errors = new HashMap<String, Errors>();
+        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
+        Object[] brokerStructs = (Object[]) struct.get("brokers");
+        for (int i = 0; i < brokerStructs.length; i++) {
+            Struct broker = (Struct) brokerStructs[i];
+            int nodeId = (Integer) broker.get("node_id");
+            String host = (String) broker.get("host");
+            int port = (Integer) broker.get("port");
+            brokers.put(nodeId, new Node(nodeId, host, port));
+        }
+        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+        Object[] topicInfos = (Object[]) struct.get("topic_metadata");
+        for (int i = 0; i < topicInfos.length; i++) {
+            Struct topicInfo = (Struct) topicInfos[i];
+            short topicError = topicInfo.getShort("topic_error_code");
+            String topic = topicInfo.getString("topic");
+            if (topicError == Errors.NONE.code()) {
+                Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+                for (int j = 0; j < partitionInfos.length; j++) {
+                    Struct partitionInfo = (Struct) partitionInfos[j];
+                    short partError = partitionInfo.getShort("partition_error_code");
+                    if (partError == Errors.NONE.code()) {
+                        int partition = partitionInfo.getInt("partition_id");
+                        int leader = partitionInfo.getInt("leader");
+                        Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                        Object[] replicas = (Object[]) partitionInfo.get("replicas");
+                        Node[] replicaNodes = new Node[replicas.length];
+                        for (int k = 0; k < replicas.length; k++)
+                            replicaNodes[k] = brokers.get(replicas[k]);
+                        Object[] isr = (Object[]) partitionInfo.get("isr");
+                        Node[] isrNodes = new Node[isr.length];
+                        for (int k = 0; k < isr.length; k++)
+                            isrNodes[k] = brokers.get(isr[k]);
+                        partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
+                    }
+                }
+            } else {
+                errors.put(topic, Errors.forCode(topicError));
+            }
+        }
+        this.errors = errors;
+        this.cluster = new Cluster(brokers.values(), partitions);
+    }
+
+    public Map<String, Errors> errors() {
+        return this.errors;
+    }
+
+    public Cluster cluster() {
+        return this.cluster;
+    }
+
+}