You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/15 06:42:00 UTC
[kafka] branch 2.3 updated: KAFKA-8788: Optimize client metadata
handling with a large number of partitions (#7192)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 4d0cc43 KAFKA-8788: Optimize client metadata handling with a large number of partitions (#7192)
4d0cc43 is described below
commit 4d0cc439eea0c57aba508fae257c366edfd39028
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Aug 14 19:44:38 2019 -0700
KAFKA-8788: Optimize client metadata handling with a large number of partitions (#7192)
Credit to @lbradstreet for profiling the producer with a large number of partitions.
Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
the first time it's needed avoid unnecessary recomputation. We were previously
computing`brokersMap` 4 times per partition in one code path that was invoked from
multiple places. This is a regression introduced via a42f16f980 and first released
in 2.3.0.
The `Cluster` constructor became significantly more allocation heavy due to
2c44e77e2f20, first released in 2.2.0. Replaced `merge` calls with more verbose,
but more efficient code. Added a test to verify that the returned collections are
unmodifiable.
Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
`MetadataResponse` and remove `data()` method.
Reviewers: Manikumar Reddy <ma...@gmail.com>, Lucas Bradstreet <lu...@gmail.com>, Colin P. McCabe <cm...@confluent.io>, Stanislav Kozlovski <st...@outlook.com>, Justine Olshan <jo...@confluent.io>
---
.../kafka/clients/admin/KafkaAdminClient.java | 4 +-
.../main/java/org/apache/kafka/common/Cluster.java | 55 ++++++-
.../kafka/common/requests/MetadataResponse.java | 164 +++++++++++++--------
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../java/org/apache/kafka/common/ClusterTest.java | 50 ++++++-
5 files changed, 198 insertions(+), 77 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e394458..45f7da0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1520,7 +1520,7 @@ public class KafkaAdminClient extends AdminClient {
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
- validAclOperations(response.data().topics().find(topicName).topicAuthorizedOperations()));
+ validAclOperations(response.topicAuthorizedOperations(topicName).get()));
future.complete(topicDescription);
}
}
@@ -1579,7 +1579,7 @@ public class KafkaAdminClient extends AdminClient {
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
- validAclOperations(response.data().clusterAuthorizedOperations()));
+ validAclOperations(response.clusterAuthorizedOperations()));
}
private Node controller(MetadataResponse response) {
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0b01d22..752c893 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.common;
-import org.apache.kafka.common.utils.Utils;
-
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -108,23 +106,64 @@ public final class Cluster {
// Index the nodes for quick lookup
Map<Integer, Node> tmpNodesById = new HashMap<>();
- for (Node node : nodes)
+ Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>(nodes.size());
+ for (Node node : nodes) {
tmpNodesById.put(node.id(), node);
+ // Populate the map here to make it easy to add the partitions per node efficiently when iterating over
+ // the partitions
+ tmpPartitionsByNode.put(node.id(), new ArrayList<>());
+ }
this.nodesById = Collections.unmodifiableMap(tmpNodesById);
// index the partition infos by topic, topic+partition, and node
+ // note that this code is performance sensitive if there are a large number of partitions so we are careful
+ // to avoid unnecessary work
Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size());
Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
- Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>();
- Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>();
for (PartitionInfo p : partitions) {
tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
- tmpPartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
+ List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
+ if (partitionsForTopic == null) {
+ partitionsForTopic = new ArrayList<>();
+ tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
+ }
+ partitionsForTopic.add(p);
if (p.leader() != null) {
- tmpAvailablePartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
- tmpPartitionsByNode.merge(p.leader().id(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
+ // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
+ // in the metadata response
+ List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+ partitionsForNode.add(p);
}
}
+
+ // Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
+ for (Map.Entry<Integer, List<PartitionInfo>> entry : tmpPartitionsByNode.entrySet()) {
+ tmpPartitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+ }
+
+ // Populate `tmpAvailablePartitionsByTopic` and update the values of `tmpPartitionsByTopic` to contain
+ // unmodifiable lists
+ Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>(tmpPartitionsByTopic.size());
+ for (Map.Entry<String, List<PartitionInfo>> entry : tmpPartitionsByTopic.entrySet()) {
+ String topic = entry.getKey();
+ List<PartitionInfo> partitionsForTopic = Collections.unmodifiableList(entry.getValue());
+ tmpPartitionsByTopic.put(topic, partitionsForTopic);
+ // Optimise for the common case where all partitions are available
+ boolean foundUnavailablePartition = partitionsForTopic.stream().anyMatch(p -> p.leader() == null);
+ List<PartitionInfo> availablePartitionsForTopic;
+ if (foundUnavailablePartition) {
+ availablePartitionsForTopic = new ArrayList<>(partitionsForTopic.size());
+ for (PartitionInfo p : partitionsForTopic) {
+ if (p.leader() != null)
+ availablePartitionsForTopic.add(p);
+ }
+ availablePartitionsForTopic = Collections.unmodifiableList(availablePartitionsForTopic);
+ } else {
+ availablePartitionsForTopic = partitionsForTopic;
+ }
+ tmpAvailablePartitionsByTopic.put(topic, availablePartitionsForTopic);
+ }
+
this.partitionsByTopicPartition = Collections.unmodifiableMap(tmpPartitionsByTopicPartition);
this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 39b6180..ef5381b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -57,17 +58,13 @@ public class MetadataResponse extends AbstractResponse {
public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
- private MetadataResponseData data;
+ private final MetadataResponseData data;
+ private volatile Holder holder;
public MetadataResponse(MetadataResponseData data) {
this.data = data;
}
- private Map<Integer, Node> brokersMap() {
- return data.brokers().stream().collect(
- Collectors.toMap(MetadataResponseBroker::nodeId, b -> new Node(b.nodeId(), b.host(), b.port(), b.rack())));
- }
-
public MetadataResponse(Struct struct, short version) {
this(new MetadataResponseData(struct, version));
}
@@ -77,28 +74,6 @@ public class MetadataResponse extends AbstractResponse {
return data.toStruct(version);
}
- public MetadataResponseData data() {
- return data;
- }
-
- private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
- List<Node> nodes = new ArrayList<>(brokerIds.size());
- for (Integer brokerId : brokerIds)
- if (brokers.containsKey(brokerId))
- nodes.add(brokers.get(brokerId));
- else
- nodes.add(new Node(brokerId, "", -1));
- return nodes;
- }
-
- private Node getControllerNode(int controllerId, Collection<Node> brokers) {
- for (Node broker : brokers) {
- if (broker.id() == controllerId)
- return broker;
- }
- return null;
- }
-
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
@@ -145,7 +120,6 @@ public class MetadataResponse extends AbstractResponse {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata()) {
-
if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
internalTopics.add(metadata.topic);
@@ -154,13 +128,30 @@ public class MetadataResponse extends AbstractResponse {
}
}
}
- return new Cluster(data.clusterId(), brokersMap().values(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
+ return new Cluster(data.clusterId(), brokers(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
}
/**
- * Transform a topic and PartitionMetadata into PartitionInfo
- * @return
+ * Returns a 32-bit bitfield to represent authorized operations for this topic.
+ */
+ public Optional<Integer> topicAuthorizedOperations(String topicName) {
+ MetadataResponseTopic topic = data.topics().find(topicName);
+ if (topic == null)
+ return Optional.empty();
+ else
+ return Optional.of(topic.topicAuthorizedOperations());
+ }
+
+ /**
+ * Returns a 32-bit bitfield to represent authorized operations for this cluster.
+ */
+ public int clusterAuthorizedOperations() {
+ return data.clusterAuthorizedOperations();
+ }
+
+ /**
+ * Transform a topic and PartitionMetadata into PartitionInfo.
*/
public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata partitionMetadata) {
return new PartitionInfo(
@@ -172,12 +163,22 @@ public class MetadataResponse extends AbstractResponse {
partitionMetadata.offlineReplicas().toArray(new Node[0]));
}
+ private Holder holder() {
+ if (holder == null) {
+ synchronized (data) {
+ if (holder == null)
+ holder = new Holder(data);
+ }
+ }
+ return holder;
+ }
+
/**
* Get all brokers returned in metadata response
* @return the brokers
*/
public Collection<Node> brokers() {
- return new ArrayList<>(brokersMap().values());
+ return holder().brokers;
}
/**
@@ -185,30 +186,7 @@ public class MetadataResponse extends AbstractResponse {
* @return the topicMetadata
*/
public Collection<TopicMetadata> topicMetadata() {
- List<TopicMetadata> topicMetadataList = new ArrayList<>();
- for (MetadataResponseTopic topicMetadata : data.topics()) {
- Errors topicError = Errors.forCode(topicMetadata.errorCode());
- String topic = topicMetadata.name();
- boolean isInternal = topicMetadata.isInternal();
- List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
-
- for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
- Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
- int partitionIndex = partitionMetadata.partitionIndex();
- int leader = partitionMetadata.leaderId();
- Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
- Node leaderNode = leader == -1 ? null : brokersMap().get(leader);
- List<Node> replicaNodes = convertToNodes(brokersMap(), partitionMetadata.replicaNodes());
- List<Node> isrNodes = convertToNodes(brokersMap(), partitionMetadata.isrNodes());
- List<Node> offlineNodes = convertToNodes(brokersMap(), partitionMetadata.offlineReplicas());
- partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
- replicaNodes, isrNodes, offlineNodes));
- }
-
- topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
- topicMetadata.topicAuthorizedOperations()));
- }
- return topicMetadataList;
+ return holder().topicMetadata;
}
/**
@@ -216,7 +194,7 @@ public class MetadataResponse extends AbstractResponse {
* @return the controller node or null if it doesn't exist
*/
public Node controller() {
- return getControllerNode(data.controllerId(), brokers());
+ return holder().controller;
}
/**
@@ -381,18 +359,76 @@ public class MetadataResponse extends AbstractResponse {
}
}
- public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
+ private static class Holder {
+ private final Collection<Node> brokers;
+ private final Node controller;
+ private final Collection<TopicMetadata> topicMetadata;
+
+ Holder(MetadataResponseData data) {
+ this.brokers = Collections.unmodifiableCollection(createBrokers(data));
+ Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
+ this.topicMetadata = createTopicMetadata(data, brokerMap);
+ this.controller = brokerMap.get(data.controllerId());
+ }
+
+ private Collection<Node> createBrokers(MetadataResponseData data) {
+ return data.brokers().valuesList().stream().map(b ->
+ new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+ }
+
+ private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+ List<TopicMetadata> topicMetadataList = new ArrayList<>();
+ for (MetadataResponseTopic topicMetadata : data.topics()) {
+ Errors topicError = Errors.forCode(topicMetadata.errorCode());
+ String topic = topicMetadata.name();
+ boolean isInternal = topicMetadata.isInternal();
+ List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
+
+ for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
+ Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
+ int partitionIndex = partitionMetadata.partitionIndex();
+ int leader = partitionMetadata.leaderId();
+ Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
+ Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
+ List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
+ List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
+ List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
+ partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
+ replicaNodes, isrNodes, offlineNodes));
+ }
+
+ topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
+ topicMetadata.topicAuthorizedOperations()));
+ }
+ return topicMetadataList;
+ }
+
+ private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
+ List<Node> nodes = new ArrayList<>(brokerIds.size());
+ for (Integer brokerId : brokerIds) {
+ Node node = brokers.get(brokerId);
+ if (node == null)
+ nodes.add(new Node(brokerId, "", -1));
+ else
+ nodes.add(node);
+ }
+ return nodes;
+ }
+
+ }
+
+ public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList,
int clusterAuthorizedOperations) {
MetadataResponseData responseData = new MetadataResponseData();
responseData.setThrottleTimeMs(throttleTimeMs);
- brokers.forEach(broker -> {
+ brokers.forEach(broker ->
responseData.brokers().add(new MetadataResponseBroker()
.setNodeId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
- .setRack(broker.rack()));
- });
+ .setRack(broker.rack()))
+ );
responseData.setClusterId(clusterId);
responseData.setControllerId(controllerId);
@@ -421,13 +457,13 @@ public class MetadataResponse extends AbstractResponse {
return new MetadataResponse(responseData);
}
- public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
+ public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList) {
return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}
- public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,
+ public static MetadataResponse prepareResponse(Collection<Node> brokers, String clusterId, int controllerId,
List<TopicMetadata> topicMetadata) {
return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 44c00c4..30d3b72 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1774,7 +1774,7 @@ public class FetcherTest {
}
Node controller = originalResponse.controller();
MetadataResponse altered = MetadataResponse.prepareResponse(
- (List<Node>) originalResponse.brokers(),
+ originalResponse.brokers(),
originalResponse.clusterId(),
controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
altTopics);
diff --git a/clients/src/test/java/org/apache/kafka/common/ClusterTest.java b/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
index 0a7049b..2c80d08 100644
--- a/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
@@ -22,19 +22,35 @@ import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class ClusterTest {
+ private final static Node[] NODES = new Node[] {
+ new Node(0, "localhost", 99),
+ new Node(1, "localhost", 100),
+ new Node(2, "localhost", 101),
+ new Node(11, "localhost", 102)
+ };
+
+ private final static String TOPIC_A = "topicA";
+ private final static String TOPIC_B = "topicB";
+ private final static String TOPIC_C = "topicC";
+ private final static String TOPIC_D = "topicD";
+ private final static String TOPIC_E = "topicE";
+
@Test
public void testBootstrap() {
String ipAddress = "140.211.11.105";
String hostName = "www.example.com";
Cluster cluster = Cluster.bootstrap(Arrays.asList(
- new InetSocketAddress(ipAddress, 9002),
- new InetSocketAddress(hostName, 9002)
+ new InetSocketAddress(ipAddress, 9002),
+ new InetSocketAddress(hostName, 9002)
));
Set<String> expectedHosts = Utils.mkSet(ipAddress, hostName);
Set<String> actualHosts = new HashSet<>();
@@ -43,4 +59,34 @@ public class ClusterTest {
assertEquals(expectedHosts, actualHosts);
}
+ @Test
+ public void testReturnUnmodifiableCollections() {
+ List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
+ new PartitionInfo(TOPIC_A, 1, null, NODES, NODES),
+ new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES),
+ new PartitionInfo(TOPIC_B, 0, null, NODES, NODES),
+ new PartitionInfo(TOPIC_B, 1, NODES[0], NODES, NODES),
+ new PartitionInfo(TOPIC_C, 0, null, NODES, NODES),
+ new PartitionInfo(TOPIC_D, 0, NODES[1], NODES, NODES),
+ new PartitionInfo(TOPIC_E, 0, NODES[0], NODES, NODES)
+ );
+ Set<String> unauthorizedTopics = Utils.mkSet(TOPIC_C);
+ Set<String> invalidTopics = Utils.mkSet(TOPIC_D);
+ Set<String> internalTopics = Utils.mkSet(TOPIC_E);
+ Cluster cluster = new Cluster("clusterId", asList(NODES), allPartitions, unauthorizedTopics,
+ invalidTopics, internalTopics, NODES[1]);
+
+ assertThrows(UnsupportedOperationException.class, () -> cluster.invalidTopics().add("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.internalTopics().add("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.unauthorizedTopics().add("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.topics().add("foo"));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.nodes().add(NODES[3]));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.partitionsForTopic(TOPIC_A).add(
+ new PartitionInfo(TOPIC_A, 3, NODES[0], NODES, NODES)));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.availablePartitionsForTopic(TOPIC_B).add(
+ new PartitionInfo(TOPIC_B, 2, NODES[0], NODES, NODES)));
+ assertThrows(UnsupportedOperationException.class, () -> cluster.partitionsForNode(NODES[1].id()).add(
+ new PartitionInfo(TOPIC_B, 2, NODES[1], NODES, NODES)));
+ }
+
}