You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/10 01:04:45 UTC

[kafka] 03/04: Loosen strict leader node requirement

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch KAFKA-9261
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 524a50a29b47d7ceb04f3699be27642493639e02
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Dec 3 10:10:06 2019 -0800

    Loosen strict leader node requirement
---
 .../main/java/org/apache/kafka/common/Cluster.java | 21 +++---
 .../apache/kafka/clients/MetadataCacheTest.java    | 85 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 12 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c765cdc..a6bf763 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -122,18 +122,15 @@ public final class Cluster {
         Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
         for (PartitionInfo p : partitions) {
             tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-            List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
-            if (partitionsForTopic == null) {
-                partitionsForTopic = new ArrayList<>();
-                tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
-            }
-            partitionsForTopic.add(p);
-            if (p.leader() != null) {
-                // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
-                // in the metadata response
-                List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
-                partitionsForNode.add(p);
-            }
+            tmpPartitionsByTopic.computeIfAbsent(p.topic(), topic -> new ArrayList<>()).add(p);
+
+            // The leader may not be known
+            if (p.leader() == null || p.leader().isEmpty())
+                continue;
+
+            // If it is known, its node information should be available
+            List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+            partitionsForNode.add(p);
         }
 
         // Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
new file mode 100644
index 0000000..2968b80
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataCacheTest {
+
+    @Test
+    public void testMissingLeaderEndpoint() {
+        // Although the broker attempts to ensure leader information is available, the
+        // client metadata cache may retain partition metadata across multiple responses.
+        // For example, separate responses may contain conflicting leader epochs for
+        // separate partitions and the client will always retain the highest.
+
+        TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+        MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
+                Errors.NONE,
+                topicPartition,
+                5,
+                Optional.of(10),
+                Arrays.asList(5, 6, 7),
+                Arrays.asList(5, 6, 7),
+                Collections.emptyList());
+
+        Map<Integer, Node> nodesById = new HashMap<>();
+        nodesById.put(6, new Node(6, "localhost", 2077));
+        nodesById.put(7, new Node(7, "localhost", 2078));
+        nodesById.put(8, new Node(8, "localhost", 2079));
+
+        MetadataCache cache = new MetadataCache("clusterId",
+                nodesById,
+                Collections.singleton(partitionMetadata),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                null);
+
+        Cluster cluster = cache.cluster();
+        assertNull(cluster.leaderFor(topicPartition));
+
+        PartitionInfo partitionInfo = cluster.partition(topicPartition);
+        Map<Integer, Node> replicas = Arrays.stream(partitionInfo.replicas())
+                .collect(Collectors.toMap(Node::id, Function.identity()));
+        assertNull(partitionInfo.leader());
+        assertEquals(3, replicas.size());
+        assertTrue(replicas.get(5).isEmpty());
+        assertEquals(nodesById.get(6), replicas.get(6));
+        assertEquals(nodesById.get(7), replicas.get(7));
+    }
+
+}
\ No newline at end of file