You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/23 02:05:36 UTC
git commit: kafka-1609;
New producer metadata response handling should only exclude a
PartitionInfo when its error is LEADER_NOT_AVAILABLE; patched by Dong Lin;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk a30491ac5 -> aa775a199
kafka-1609; New producer metadata response handling should only exclude a PartitionInfo when its error is LEADER_NOT_AVAILABLE; patched by Dong Lin; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa775a19
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa775a19
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa775a19
Branch: refs/heads/trunk
Commit: aa775a199edbdbb4bcb1b3ac8f75d7e5c80fcaee
Parents: a30491a
Author: Dong Lin <li...@gmail.com>
Authored: Fri Aug 22 17:05:33 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 22 17:05:33 2014 -0700
----------------------------------------------------------------------
.../kafka/common/requests/MetadataResponse.java | 27 +++++++++-----------
1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa775a19/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 7d90fce..d97962d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -117,21 +117,18 @@ public class MetadataResponse extends AbstractRequestResponse {
Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
for (int j = 0; j < partitionInfos.length; j++) {
Struct partitionInfo = (Struct) partitionInfos[j];
- short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME);
- if (partError == Errors.NONE.code()) {
- int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
- int leader = partitionInfo.getInt(LEADER_KEY_NAME);
- Node leaderNode = leader == -1 ? null : brokers.get(leader);
- Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
- Node[] replicaNodes = new Node[replicas.length];
- for (int k = 0; k < replicas.length; k++)
- replicaNodes[k] = brokers.get(replicas[k]);
- Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
- Node[] isrNodes = new Node[isr.length];
- for (int k = 0; k < isr.length; k++)
- isrNodes[k] = brokers.get(isr[k]);
- partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
- }
+ int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
+ int leader = partitionInfo.getInt(LEADER_KEY_NAME);
+ Node leaderNode = leader == -1 ? null : brokers.get(leader);
+ Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
+ Node[] replicaNodes = new Node[replicas.length];
+ for (int k = 0; k < replicas.length; k++)
+ replicaNodes[k] = brokers.get(replicas[k]);
+ Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
+ Node[] isrNodes = new Node[isr.length];
+ for (int k = 0; k < isr.length; k++)
+ isrNodes[k] = brokers.get(isr[k]);
+ partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
}
} else {
errors.put(topic, Errors.forCode(topicError));