You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/27 00:44:55 UTC
[kafka] branch trunk updated: MINOR: MetadataResponse#toStruct
should serialize null leaders correctly. (#4449)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8b30e4 MINOR: MetadataResponse#toStruct should serialize null leaders correctly. (#4449)
e8b30e4 is described below
commit e8b30e4d255dce455632d809f7bd54b04e6bc6fd
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Fri Jan 26 16:44:49 2018 -0800
MINOR: MetadataResponse#toStruct should serialize null leaders correctly. (#4449)
In MetadataResponse deserialization, if the partition leader key is set
to -1, the leader is set to null. The MetadataResponse#toStruct code
should handle this correctly as well.
Also fix a case in KafkaApis where we were not taking into account the
possibility of the leader being null.
RequestResponseTest should test this as well.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
.../java/org/apache/kafka/common/requests/MetadataResponse.java | 8 ++++++--
.../org/apache/kafka/common/requests/RequestResponseTest.java | 3 +++
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 99c4ffb..cda3c07 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -464,6 +464,10 @@ public class MetadataResponse extends AbstractResponse {
return partition;
}
+ public int leaderId() {
+ return leader == null ? -1 : leader.id();
+ }
+
public Node leader() {
return leader;
}
@@ -482,7 +486,7 @@ public class MetadataResponse extends AbstractResponse {
@Override
public String toString() {
- return "(type=PartitionMetadata," +
+ return "(type=PartitionMetadata" +
", error=" + error +
", partition=" + partition +
", leader=" + leader +
@@ -531,7 +535,7 @@ public class MetadataResponse extends AbstractResponse {
Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
partitionData.set(ERROR_CODE, partitionMetadata.error.code());
partitionData.set(PARTITION_ID, partitionMetadata.partition);
- partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id());
+ partitionData.set(LEADER_KEY_NAME, partitionMetadata.leaderId());
ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size());
for (Node node : partitionMetadata.replicas)
replicas.add(node.id());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c18f5c2..2740616 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -702,6 +702,9 @@ public class RequestResponseTest {
asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas))));
allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
Collections.<MetadataResponse.PartitionMetadata>emptyList()));
+ allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
+ asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
+ replicas, isr, offlineReplicas))));
return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1ff75c0..13f5164 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1105,6 +1105,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
.find(_.partition == partition)
.map(_.leader)
+ .flatMap(p => Option(p))
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.