You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/02/11 05:59:41 UTC
[2/4] git commit: KAFKA-1238 Move metadata req/resp parsing into its
own classes and avoid updating cluster metadata if there are no available
nodes.
KAFKA-1238 Move metadata req/resp parsing into its own classes and avoid updating cluster metadata if there are no available nodes.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2ec321a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2ec321a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2ec321a
Branch: refs/heads/trunk
Commit: d2ec321a03e654552ee364f6572437f233b0c226
Parents: 962b547
Author: Jay Kreps <ja...@gmail.com>
Authored: Fri Feb 7 09:08:33 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Sun Feb 9 15:20:33 2014 -0800
----------------------------------------------------------------------
.../clients/producer/internals/Sender.java | 25 ++++---
.../kafka/common/protocol/ProtoUtils.java | 49 -------------
.../kafka/common/requests/MetadataRequest.java | 25 +++++++
.../kafka/common/requests/MetadataResponse.java | 77 ++++++++++++++++++++
4 files changed, 117 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b274e5e..3e10e32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -39,12 +39,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
-
/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
@@ -300,8 +301,12 @@ public class Sender implements Runnable {
private void handleMetadataResponse(Struct body, long now) {
this.metadataFetchInProgress = false;
- Cluster cluster = ProtoUtils.parseMetadataResponse(body);
- this.metadata.update(cluster, now);
+ MetadataResponse response = new MetadataResponse(body);
+ Cluster cluster = response.cluster();
+ // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
+ // created which means we will get errors and no nodes until it exists
+ if (cluster.nodes().size() > 0)
+ this.metadata.update(cluster, now);
}
/**
@@ -338,11 +343,8 @@ public class Sender implements Runnable {
* Create a metadata request for the given topics
*/
private InFlightRequest metadataRequest(int node, Set<String> topics) {
- String[] ts = new String[topics.size()];
- topics.toArray(ts);
- Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
- body.set("topics", topics.toArray());
- RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body);
+ MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
+ RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
return new InFlightRequest(true, send, null);
}
@@ -403,11 +405,14 @@ public class Sender implements Runnable {
}
produce.set("topic_data", topicDatas.toArray());
- RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++);
- RequestSend send = new RequestSend(destination, header, produce);
+ RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
return new InFlightRequest(acks != 0, send, batchesByPartition);
}
+ private RequestHeader header(ApiKeys key) {
+ return new RequestHeader(key.id, clientId, correlation++);
+ }
+
/**
* Wake up the selector associated with this send thread
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index 90df5d5..c2cbbbd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -17,18 +17,10 @@
package org.apache.kafka.common.protocol;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-
public class ProtoUtils {
private static Schema schemaFor(Schema[][] schemas, int apiKey, int version) {
@@ -70,45 +62,4 @@ public class ProtoUtils {
return (Struct) currentResponseSchema(apiKey).read(buffer);
}
- public static Cluster parseMetadataResponse(Struct response) {
- Map<Integer, Node> brokers = new HashMap<Integer, Node>();
- Object[] brokerStructs = (Object[]) response.get("brokers");
- for (int i = 0; i < brokerStructs.length; i++) {
- Struct broker = (Struct) brokerStructs[i];
- int nodeId = (Integer) broker.get("node_id");
- String host = (String) broker.get("host");
- int port = (Integer) broker.get("port");
- brokers.put(nodeId, new Node(nodeId, host, port));
- }
- List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
- Object[] topicInfos = (Object[]) response.get("topic_metadata");
- for (int i = 0; i < topicInfos.length; i++) {
- Struct topicInfo = (Struct) topicInfos[i];
- short topicError = topicInfo.getShort("topic_error_code");
- if (topicError == Errors.NONE.code()) {
- String topic = topicInfo.getString("topic");
- Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
- for (int j = 0; j < partitionInfos.length; j++) {
- Struct partitionInfo = (Struct) partitionInfos[j];
- short partError = partitionInfo.getShort("partition_error_code");
- if (partError == Errors.NONE.code()) {
- int partition = partitionInfo.getInt("partition_id");
- int leader = partitionInfo.getInt("leader");
- Node leaderNode = leader == -1 ? null : brokers.get(leader);
- Object[] replicas = (Object[]) partitionInfo.get("replicas");
- Node[] replicaNodes = new Node[replicas.length];
- for (int k = 0; k < replicas.length; k++)
- replicaNodes[k] = brokers.get(replicas[k]);
- Object[] isr = (Object[]) partitionInfo.get("isr");
- Node[] isrNodes = new Node[isr.length];
- for (int k = 0; k < isr.length; k++)
- isrNodes[k] = brokers.get(isr[k]);
- partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
- }
- }
- }
- }
- return new Cluster(brokers.values(), partitions);
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
new file mode 100644
index 0000000..91b9d64
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -0,0 +1,25 @@
+package org.apache.kafka.common.requests;
+
+import java.util.List;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class MetadataRequest {
+
+ private final List<String> topics;
+
+ public MetadataRequest(List<String> topics) {
+ this.topics = topics;
+ }
+
+ public Struct toStruct() {
+ String[] ts = new String[topics.size()];
+ topics.toArray(ts);
+ Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
+ body.set("topics", topics.toArray());
+ return body;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2ec321a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
new file mode 100644
index 0000000..73b7006
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -0,0 +1,77 @@
+package org.apache.kafka.common.requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class MetadataResponse {
+
+ private final Cluster cluster;
+ private final Map<String, Errors> errors;
+
+ public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
+ this.cluster = cluster;
+ this.errors = errors;
+ }
+
+ public MetadataResponse(Struct struct) {
+ Map<String, Errors> errors = new HashMap<String, Errors>();
+ Map<Integer, Node> brokers = new HashMap<Integer, Node>();
+ Object[] brokerStructs = (Object[]) struct.get("brokers");
+ for (int i = 0; i < brokerStructs.length; i++) {
+ Struct broker = (Struct) brokerStructs[i];
+ int nodeId = (Integer) broker.get("node_id");
+ String host = (String) broker.get("host");
+ int port = (Integer) broker.get("port");
+ brokers.put(nodeId, new Node(nodeId, host, port));
+ }
+ List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
+ Object[] topicInfos = (Object[]) struct.get("topic_metadata");
+ for (int i = 0; i < topicInfos.length; i++) {
+ Struct topicInfo = (Struct) topicInfos[i];
+ short topicError = topicInfo.getShort("topic_error_code");
+ String topic = topicInfo.getString("topic");
+ if (topicError == Errors.NONE.code()) {
+ Object[] partitionInfos = (Object[]) topicInfo.get("partition_metadata");
+ for (int j = 0; j < partitionInfos.length; j++) {
+ Struct partitionInfo = (Struct) partitionInfos[j];
+ short partError = partitionInfo.getShort("partition_error_code");
+ if (partError == Errors.NONE.code()) {
+ int partition = partitionInfo.getInt("partition_id");
+ int leader = partitionInfo.getInt("leader");
+ Node leaderNode = leader == -1 ? null : brokers.get(leader);
+ Object[] replicas = (Object[]) partitionInfo.get("replicas");
+ Node[] replicaNodes = new Node[replicas.length];
+ for (int k = 0; k < replicas.length; k++)
+ replicaNodes[k] = brokers.get(replicas[k]);
+ Object[] isr = (Object[]) partitionInfo.get("isr");
+ Node[] isrNodes = new Node[isr.length];
+ for (int k = 0; k < isr.length; k++)
+ isrNodes[k] = brokers.get(isr[k]);
+ partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
+ }
+ }
+ } else {
+ errors.put(topic, Errors.forCode(topicError));
+ }
+ }
+ this.errors = errors;
+ this.cluster = new Cluster(brokers.values(), partitions);
+ }
+
+ public Map<String, Errors> errors() {
+ return this.errors;
+ }
+
+ public Cluster cluster() {
+ return this.cluster;
+ }
+
+}