You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/10 01:04:45 UTC
[kafka] 03/04: Loosen strict leader node requirement
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch KAFKA-9261
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 524a50a29b47d7ceb04f3699be27642493639e02
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Dec 3 10:10:06 2019 -0800
Loosen strict leader node requirement
---
.../main/java/org/apache/kafka/common/Cluster.java | 21 +++---
.../apache/kafka/clients/MetadataCacheTest.java | 85 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 12 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index c765cdc..a6bf763 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -122,18 +122,15 @@ public final class Cluster {
Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
for (PartitionInfo p : partitions) {
tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
- List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
- if (partitionsForTopic == null) {
- partitionsForTopic = new ArrayList<>();
- tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
- }
- partitionsForTopic.add(p);
- if (p.leader() != null) {
- // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
- // in the metadata response
- List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
- partitionsForNode.add(p);
- }
+ tmpPartitionsByTopic.computeIfAbsent(p.topic(), topic -> new ArrayList<>()).add(p);
+
+ // The leader may not be known
+ if (p.leader() == null || p.leader().isEmpty())
+ continue;
+
+ // If it is known, its node information should be available
+ List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+ partitionsForNode.add(p);
}
// Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
new file mode 100644
index 0000000..2968b80
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataCacheTest {
+
+ @Test
+ public void testMissingLeaderEndpoint() {
+ // Although the broker attempts to ensure leader information is available, the
+ // client metadata cache may retain partition metadata across multiple responses.
+ // For example, separate responses may contain conflicting leader epochs for
+ // separate partitions and the client will always retain the highest.
+
+ TopicPartition topicPartition = new TopicPartition("topic", 0);
+
+ MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
+ Errors.NONE,
+ topicPartition,
+ 5,
+ Optional.of(10),
+ Arrays.asList(5, 6, 7),
+ Arrays.asList(5, 6, 7),
+ Collections.emptyList());
+
+ Map<Integer, Node> nodesById = new HashMap<>();
+ nodesById.put(6, new Node(6, "localhost", 2077));
+ nodesById.put(7, new Node(7, "localhost", 2078));
+ nodesById.put(8, new Node(8, "localhost", 2079));
+
+ MetadataCache cache = new MetadataCache("clusterId",
+ nodesById,
+ Collections.singleton(partitionMetadata),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ null);
+
+ Cluster cluster = cache.cluster();
+ assertNull(cluster.leaderFor(topicPartition));
+
+ PartitionInfo partitionInfo = cluster.partition(topicPartition);
+ Map<Integer, Node> replicas = Arrays.stream(partitionInfo.replicas())
+ .collect(Collectors.toMap(Node::id, Function.identity()));
+ assertNull(partitionInfo.leader());
+ assertEquals(3, replicas.size());
+ assertTrue(replicas.get(5).isEmpty());
+ assertEquals(nodesById.get(6), replicas.get(6));
+ assertEquals(nodesById.get(7), replicas.get(7));
+ }
+
+}
\ No newline at end of file