You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/23 02:05:36 UTC

git commit: kafka-1609; New producer metadata response handling should only exclude a PartitionInfo when its error is LEADER_NOT_AVAILABLE; patched by Dong Lin; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk a30491ac5 -> aa775a199


kafka-1609; New producer metadata response handling should only exclude a PartitionInfo when its error is LEADER_NOT_AVAILABLE; patched by Dong Lin; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa775a19
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa775a19
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa775a19

Branch: refs/heads/trunk
Commit: aa775a199edbdbb4bcb1b3ac8f75d7e5c80fcaee
Parents: a30491a
Author: Dong Lin <li...@gmail.com>
Authored: Fri Aug 22 17:05:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 22 17:05:33 2014 -0700

----------------------------------------------------------------------
 .../kafka/common/requests/MetadataResponse.java | 27 +++++++++-----------
 1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa775a19/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 7d90fce..d97962d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -117,21 +117,18 @@ public class MetadataResponse extends AbstractRequestResponse {
                 Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
                 for (int j = 0; j < partitionInfos.length; j++) {
                     Struct partitionInfo = (Struct) partitionInfos[j];
-                    short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME);
-                    if (partError == Errors.NONE.code()) {
-                        int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
-                        int leader = partitionInfo.getInt(LEADER_KEY_NAME);
-                        Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                        Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
-                        Node[] replicaNodes = new Node[replicas.length];
-                        for (int k = 0; k < replicas.length; k++)
-                            replicaNodes[k] = brokers.get(replicas[k]);
-                        Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
-                        Node[] isrNodes = new Node[isr.length];
-                        for (int k = 0; k < isr.length; k++)
-                            isrNodes[k] = brokers.get(isr[k]);
-                        partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                    }
+                    int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+                    int leader = partitionInfo.getInt(LEADER_KEY_NAME);
+                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                    Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+                    Node[] replicaNodes = new Node[replicas.length];
+                    for (int k = 0; k < replicas.length; k++)
+                        replicaNodes[k] = brokers.get(replicas[k]);
+                    Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
+                    Node[] isrNodes = new Node[isr.length];
+                    for (int k = 0; k < isr.length; k++)
+                        isrNodes[k] = brokers.get(isr[k]);
+                    partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
                 }
             } else {
                 errors.put(topic, Errors.forCode(topicError));