You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/04 03:31:13 UTC

[kafka] branch 2.4 updated: KAFKA-9261; Client should handle inconsistent leader metadata (#7772)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 6e42532  KAFKA-9261; Client should handle inconsistent leader metadata (#7772)
6e42532 is described below

commit 6e42532740ec6cd2f8264fcf4b17df816b01c768
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Dec 3 19:30:50 2019 -0800

    KAFKA-9261; Client should handle inconsistent leader metadata (#7772)
    
    This is a reduced scope fix for KAFKA-9261. The purpose of this patch is to ensure that
    partition leader state is kept in sync with broker metadata in MetadataCache and
    consequently in Cluster. Due to the possibility of metadata event reordering, it was
    possible for this state to be inconsistent which could lead to an NPE in some cases. The
    test case here provides a specific scenario where this could happen.
    
    Also see #7770 for additional detail.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../java/org/apache/kafka/clients/Metadata.java    | 26 ++++++-
 .../org/apache/kafka/clients/MetadataCache.java    |  6 +-
 .../kafka/common/requests/MetadataResponse.java    | 40 ++++++-----
 .../org/apache/kafka/clients/MetadataTest.java     | 83 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 21545d8..8372cc0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -292,6 +292,8 @@ public class Metadata implements Closeable {
                                                  Predicate<MetadataResponse.TopicMetadata> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
         List<MetadataCache.PartitionInfoAndEpoch> partitions = new ArrayList<>();
+        Map<Integer, Node> brokersById = metadataResponse.brokersById();
+
         for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
             if (!topicsToRetain.test(metadata))
                 continue;
@@ -299,12 +301,30 @@ public class Metadata implements Closeable {
             if (metadata.error() == Errors.NONE) {
                 if (metadata.isInternal())
                     internalTopics.add(metadata.topic());
-                for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
 
+                for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
                     // Even if the partition's metadata includes an error, we need to handle the update to catch new epochs
                     updatePartitionInfo(metadata.topic(), partitionMetadata, partitionInfo -> {
                         int epoch = partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                        partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
+                        Node leader = partitionInfo.leader();
+
+                        if (leader != null && !leader.equals(brokersById.get(leader.id()))) {
+                            // If we are reusing metadata from a previous response (which is possible if it
+                            // contained a larger epoch), we may not have leader information available in the
+                            // latest response. To keep the state consistent, we override the partition metadata
+                            // so that the leader is set consistently with the broker metadata
+
+                            PartitionInfo partitionInfoWithoutLeader = new PartitionInfo(
+                                    partitionInfo.topic(),
+                                    partitionInfo.partition(),
+                                    brokersById.get(leader.id()),
+                                    partitionInfo.replicas(),
+                                    partitionInfo.inSyncReplicas(),
+                                    partitionInfo.offlineReplicas());
+                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfoWithoutLeader, epoch));
+                        } else {
+                            partitions.add(new MetadataCache.PartitionInfoAndEpoch(partitionInfo, epoch));
+                        }
                     });
 
                     if (partitionMetadata.error().exception() instanceof InvalidMetadataException) {
@@ -319,7 +339,7 @@ public class Metadata implements Closeable {
             }
         }
 
-        return new MetadataCache(metadataResponse.clusterId(), new ArrayList<>(metadataResponse.brokers()), partitions,
+        return new MetadataCache(metadataResponse.clusterId(), brokersById.values(), partitions,
                 metadataResponse.topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 metadataResponse.topicsByError(Errors.INVALID_TOPIC_EXCEPTION),
                 internalTopics, metadataResponse.controller());
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index e58da12..10b5cc9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
  */
 public class MetadataCache {
     private final String clusterId;
-    private final List<Node> nodes;
+    private final Collection<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Set<String> invalidTopics;
     private final Set<String> internalTopics;
@@ -49,7 +49,7 @@ public class MetadataCache {
     private Cluster clusterInstance;
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
+                  Collection<Node> nodes,
                   Collection<PartitionInfoAndEpoch> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
@@ -59,7 +59,7 @@ public class MetadataCache {
     }
 
     MetadataCache(String clusterId,
-                  List<Node> nodes,
+                  Collection<Node> nodes,
                   Collection<PartitionInfoAndEpoch> partitions,
                   Set<String> unauthorizedTopics,
                   Set<String> invalidTopics,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index ef5381b..350da2b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -20,9 +20,9 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.message.MetadataResponseData;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
-import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -178,6 +177,14 @@ public class MetadataResponse extends AbstractResponse {
      * @return the brokers
      */
     public Collection<Node> brokers() {
+        return holder().brokers.values();
+    }
+
+    /**
+     * Get a map of all brokers keyed by the brokerId.
+     * @return A map from the brokerId to the broker `Node` information
+     */
+    public Map<Integer, Node> brokersById() {
         return holder().brokers;
     }
 
@@ -360,23 +367,22 @@ public class MetadataResponse extends AbstractResponse {
     }
 
     private static class Holder {
-        private final Collection<Node> brokers;
+        private final Map<Integer, Node> brokers;
         private final Node controller;
         private final Collection<TopicMetadata> topicMetadata;
 
         Holder(MetadataResponseData data) {
-            this.brokers = Collections.unmodifiableCollection(createBrokers(data));
-            Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
-            this.topicMetadata = createTopicMetadata(data, brokerMap);
-            this.controller = brokerMap.get(data.controllerId());
+            this.brokers = createBrokers(data);
+            this.topicMetadata = createTopicMetadata(data);
+            this.controller = brokers.get(data.controllerId());
         }
 
-        private Collection<Node> createBrokers(MetadataResponseData data) {
-            return data.brokers().valuesList().stream().map(b ->
-                    new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+        private Map<Integer, Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b -> new Node(b.nodeId(), b.host(), b.port(), b.rack()))
+                    .collect(Collectors.toMap(Node::id, b -> b));
         }
 
-        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data) {
             List<TopicMetadata> topicMetadataList = new ArrayList<>();
             for (MetadataResponseTopic topicMetadata : data.topics()) {
                 Errors topicError = Errors.forCode(topicMetadata.errorCode());
@@ -389,10 +395,10 @@ public class MetadataResponse extends AbstractResponse {
                     int partitionIndex = partitionMetadata.partitionIndex();
                     int leader = partitionMetadata.leaderId();
                     Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                    Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
-                    List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
-                    List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
-                    List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
+                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                    List<Node> replicaNodes = convertToNodes(partitionMetadata.replicaNodes());
+                    List<Node> isrNodes = convertToNodes(partitionMetadata.isrNodes());
+                    List<Node> offlineNodes = convertToNodes(partitionMetadata.offlineReplicas());
                     partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
                             replicaNodes, isrNodes, offlineNodes));
                 }
@@ -403,7 +409,7 @@ public class MetadataResponse extends AbstractResponse {
             return topicMetadataList;
         }
 
-        private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
+        private List<Node> convertToNodes(List<Integer> brokerIds) {
             List<Node> nodes = new ArrayList<>(brokerIds.size());
             for (Integer brokerId : brokerIds) {
                 Node node = brokers.get(brokerId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index bb34094..9a9026d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -23,6 +23,12 @@ import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBrokerCollection;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopicCollection;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
@@ -33,9 +39,12 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
@@ -495,4 +504,78 @@ public class MetadataTest {
         assertEquals(metadata.fetch().nodeById(0).id(), 0);
         assertEquals(metadata.fetch().nodeById(1).id(), 1);
     }
+
+    @Test
+    public void testLeaderMetadataInconsistentWithBrokerMetadata() {
+        // Tests a reordering scenario which can lead to inconsistent leader state.
+        // A partition initially has one broker offline. That broker comes online and
+        // is elected leader. The client sees these two events in the opposite order.
+
+        TopicPartition tp = new TopicPartition("topic", 0);
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node2 = new Node(2, "localhost", 9094);
+
+        // The first metadata received by broker (epoch=10)
+        MetadataResponsePartition firstPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(10)
+                .setLeaderId(0)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(0, 1, 2))
+                .setOfflineReplicas(Collections.emptyList());
+
+        // The second metadata received has stale metadata (epoch=8)
+        MetadataResponsePartition secondPartitionMetadata = new MetadataResponsePartition()
+                .setPartitionIndex(tp.partition())
+                .setErrorCode(Errors.NONE.code())
+                .setLeaderEpoch(8)
+                .setLeaderId(1)
+                .setReplicaNodes(Arrays.asList(0, 1, 2))
+                .setIsrNodes(Arrays.asList(1, 2))
+                .setOfflineReplicas(Collections.singletonList(0));
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), firstPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node0, node1, node2)))),
+                10L);
+
+        metadata.update(new MetadataResponse(new MetadataResponseData()
+                        .setTopics(buildTopicCollection(tp.topic(), secondPartitionMetadata))
+                        .setBrokers(buildBrokerCollection(Arrays.asList(node1, node2)))),
+                20L);
+
+        assertNull(metadata.fetch().leaderFor(tp));
+        assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+        assertTrue(metadata.leaderAndEpoch(tp).leader.isEmpty());
+    }
+
+    private MetadataResponseTopicCollection buildTopicCollection(String topic, MetadataResponsePartition partitionMetadata) {
+        MetadataResponseTopic topicMetadata = new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setIsInternal(false);
+
+        topicMetadata.setPartitions(Collections.singletonList(partitionMetadata));
+
+        MetadataResponseTopicCollection topics = new MetadataResponseTopicCollection();
+        topics.add(topicMetadata);
+        return topics;
+    }
+
+    private MetadataResponseBrokerCollection buildBrokerCollection(List<Node> nodes) {
+        MetadataResponseBrokerCollection brokers = new MetadataResponseBrokerCollection();
+        for (Node node : nodes) {
+            MetadataResponseBroker broker = new MetadataResponseBroker()
+                    .setNodeId(node.id())
+                    .setHost(node.host())
+                    .setPort(node.port())
+                    .setRack(node.rack());
+            brokers.add(broker);
+        }
+        return brokers;
+    }
+
 }