You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/27 00:44:55 UTC

[kafka] branch trunk updated: MINOR: MetadataResponse#toStruct should serialize null leaders correctly. (#4449)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8b30e4  MINOR: MetadataResponse#toStruct should serialize null leaders correctly. (#4449)
e8b30e4 is described below

commit e8b30e4d255dce455632d809f7bd54b04e6bc6fd
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Fri Jan 26 16:44:49 2018 -0800

    MINOR: MetadataResponse#toStruct should serialize null leaders correctly. (#4449)
    
    In MetadataResponse deserialization, if the partition leader key is set
    to -1, the leader is set to null.  The MetadataResponse#toStruct code
    should handle this correctly as well.
    
    Also fix a case in KafkaApis where we were not taking into account the
    possibility of the leader being null.
    
    RequestResponseTest should test this as well.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/common/requests/MetadataResponse.java   | 8 ++++++--
 .../org/apache/kafka/common/requests/RequestResponseTest.java     | 3 +++
 core/src/main/scala/kafka/server/KafkaApis.scala                  | 1 +
 3 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 99c4ffb..cda3c07 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -464,6 +464,10 @@ public class MetadataResponse extends AbstractResponse {
             return partition;
         }
 
+        public int leaderId() {
+            return leader == null ? -1 : leader.id();
+        }
+
         public Node leader() {
             return leader;
         }
@@ -482,7 +486,7 @@ public class MetadataResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(type=PartitionMetadata," +
+            return "(type=PartitionMetadata" +
                     ", error=" + error +
                     ", partition=" + partition +
                     ", leader=" + leader +
@@ -531,7 +535,7 @@ public class MetadataResponse extends AbstractResponse {
                 Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
                 partitionData.set(ERROR_CODE, partitionMetadata.error.code());
                 partitionData.set(PARTITION_ID, partitionMetadata.partition);
-                partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id());
+                partitionData.set(LEADER_KEY_NAME, partitionMetadata.leaderId());
                 ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size());
                 for (Node node : partitionMetadata.replicas)
                     replicas.add(node.id());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c18f5c2..2740616 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -702,6 +702,9 @@ public class RequestResponseTest {
                 asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, "topic3", false,
+            asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
+                replicas, isr, offlineReplicas))));
 
         return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1ff75c0..13f5164 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1105,6 +1105,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
             .map(_.leader)
+            .flatMap(p => Option(p))
 
           coordinatorEndpoint match {
             case Some(endpoint) if !endpoint.isEmpty =>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.