You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/08/15 06:42:00 UTC

[kafka] branch 2.3 updated: KAFKA-8788: Optimize client metadata handling with a large number of partitions (#7192)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 4d0cc43  KAFKA-8788: Optimize client metadata handling with a large number of partitions (#7192)
4d0cc43 is described below

commit 4d0cc439eea0c57aba508fae257c366edfd39028
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Aug 14 19:44:38 2019 -0700

    KAFKA-8788: Optimize client metadata handling with a large number of partitions (#7192)
    
    Credit to @lbradstreet for profiling the producer with a large number of partitions.
    
    Cache `topicMetadata`, `brokers` and `controller` in the `MetadataResponse`
    the first time it's needed avoid unnecessary recomputation. We were previously
    computing`brokersMap` 4 times per partition in one code path that was invoked from
    multiple places. This is a regression introduced via a42f16f980 and first released
    in 2.3.0.
    
    The `Cluster` constructor became significantly more allocation heavy due to
    2c44e77e2f20, first released in 2.2.0. Replaced `merge` calls with more verbose,
    but more efficient code. Added a test to verify that the returned collections are
    unmodifiable.
    
    Add `topicAuthorizedOperations` and `clusterAuthorizedOperations` to
    `MetadataResponse` and remove `data()` method.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Lucas Bradstreet <lu...@gmail.com>, Colin P. McCabe <cm...@confluent.io>, Stanislav Kozlovski <st...@outlook.com>, Justine Olshan <jo...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   4 +-
 .../main/java/org/apache/kafka/common/Cluster.java |  55 ++++++-
 .../kafka/common/requests/MetadataResponse.java    | 164 +++++++++++++--------
 .../clients/consumer/internals/FetcherTest.java    |   2 +-
 .../java/org/apache/kafka/common/ClusterTest.java  |  50 ++++++-
 5 files changed, 198 insertions(+), 77 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e394458..45f7da0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1520,7 +1520,7 @@ public class KafkaAdminClient extends AdminClient {
                     }
                     partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
                     TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
-                        validAclOperations(response.data().topics().find(topicName).topicAuthorizedOperations()));
+                        validAclOperations(response.topicAuthorizedOperations(topicName).get()));
                     future.complete(topicDescription);
                 }
             }
@@ -1579,7 +1579,7 @@ public class KafkaAdminClient extends AdminClient {
                 controllerFuture.complete(controller(response));
                 clusterIdFuture.complete(response.clusterId());
                 authorizedOperationsFuture.complete(
-                        validAclOperations(response.data().clusterAuthorizedOperations()));
+                        validAclOperations(response.clusterAuthorizedOperations()));
             }
 
             private Node controller(MetadataResponse response) {
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 0b01d22..752c893 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common;
 
-import org.apache.kafka.common.utils.Utils;
-
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -108,23 +106,64 @@ public final class Cluster {
 
         // Index the nodes for quick lookup
         Map<Integer, Node> tmpNodesById = new HashMap<>();
-        for (Node node : nodes)
+        Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>(nodes.size());
+        for (Node node : nodes) {
             tmpNodesById.put(node.id(), node);
+            // Populate the map here to make it easy to add the partitions per node efficiently when iterating over
+            // the partitions
+            tmpPartitionsByNode.put(node.id(), new ArrayList<>());
+        }
         this.nodesById = Collections.unmodifiableMap(tmpNodesById);
 
         // index the partition infos by topic, topic+partition, and node
+        // note that this code is performance sensitive if there are a large number of partitions so we are careful
+        // to avoid unnecessary work
         Map<TopicPartition, PartitionInfo> tmpPartitionsByTopicPartition = new HashMap<>(partitions.size());
         Map<String, List<PartitionInfo>> tmpPartitionsByTopic = new HashMap<>();
-        Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>();
-        Map<Integer, List<PartitionInfo>> tmpPartitionsByNode = new HashMap<>();
         for (PartitionInfo p : partitions) {
             tmpPartitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-            tmpPartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
+            List<PartitionInfo> partitionsForTopic = tmpPartitionsByTopic.get(p.topic());
+            if (partitionsForTopic == null) {
+                partitionsForTopic = new ArrayList<>();
+                tmpPartitionsByTopic.put(p.topic(), partitionsForTopic);
+            }
+            partitionsForTopic.add(p);
             if (p.leader() != null) {
-                tmpAvailablePartitionsByTopic.merge(p.topic(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
-                tmpPartitionsByNode.merge(p.leader().id(), Collections.singletonList(p), Utils::concatListsUnmodifiable);
+                // The broker guarantees that if a partition has a non-null leader, it is one of the brokers returned
+                // in the metadata response
+                List<PartitionInfo> partitionsForNode = Objects.requireNonNull(tmpPartitionsByNode.get(p.leader().id()));
+                partitionsForNode.add(p);
             }
         }
+
+        // Update the values of `tmpPartitionsByNode` to contain unmodifiable lists
+        for (Map.Entry<Integer, List<PartitionInfo>> entry : tmpPartitionsByNode.entrySet()) {
+            tmpPartitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+        }
+
+        // Populate `tmpAvailablePartitionsByTopic` and update the values of `tmpPartitionsByTopic` to contain
+        // unmodifiable lists
+        Map<String, List<PartitionInfo>> tmpAvailablePartitionsByTopic = new HashMap<>(tmpPartitionsByTopic.size());
+        for (Map.Entry<String, List<PartitionInfo>> entry : tmpPartitionsByTopic.entrySet()) {
+            String topic = entry.getKey();
+            List<PartitionInfo> partitionsForTopic = Collections.unmodifiableList(entry.getValue());
+            tmpPartitionsByTopic.put(topic, partitionsForTopic);
+            // Optimise for the common case where all partitions are available
+            boolean foundUnavailablePartition = partitionsForTopic.stream().anyMatch(p -> p.leader() == null);
+            List<PartitionInfo> availablePartitionsForTopic;
+            if (foundUnavailablePartition) {
+                availablePartitionsForTopic = new ArrayList<>(partitionsForTopic.size());
+                for (PartitionInfo p : partitionsForTopic) {
+                    if (p.leader() != null)
+                        availablePartitionsForTopic.add(p);
+                }
+                availablePartitionsForTopic = Collections.unmodifiableList(availablePartitionsForTopic);
+            } else {
+                availablePartitionsForTopic = partitionsForTopic;
+            }
+            tmpAvailablePartitionsByTopic.put(topic, availablePartitionsForTopic);
+        }
+
         this.partitionsByTopicPartition = Collections.unmodifiableMap(tmpPartitionsByTopicPartition);
         this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
         this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 39b6180..ef5381b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -57,17 +58,13 @@ public class MetadataResponse extends AbstractResponse {
 
     public static final int AUTHORIZED_OPERATIONS_OMITTED = Integer.MIN_VALUE;
 
-    private MetadataResponseData data;
+    private final MetadataResponseData data;
+    private volatile Holder holder;
 
     public MetadataResponse(MetadataResponseData data) {
         this.data = data;
     }
 
-    private Map<Integer, Node> brokersMap() {
-        return data.brokers().stream().collect(
-            Collectors.toMap(MetadataResponseBroker::nodeId, b -> new Node(b.nodeId(), b.host(), b.port(), b.rack())));
-    }
-
     public MetadataResponse(Struct struct, short version) {
         this(new MetadataResponseData(struct, version));
     }
@@ -77,28 +74,6 @@ public class MetadataResponse extends AbstractResponse {
         return data.toStruct(version);
     }
 
-    public MetadataResponseData data() {
-        return data;
-    }
-
-    private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
-        List<Node> nodes = new ArrayList<>(brokerIds.size());
-        for (Integer brokerId : brokerIds)
-            if (brokers.containsKey(brokerId))
-                nodes.add(brokers.get(brokerId));
-            else
-                nodes.add(new Node(brokerId, "", -1));
-        return nodes;
-    }
-
-    private Node getControllerNode(int controllerId, Collection<Node> brokers) {
-        for (Node broker : brokers) {
-            if (broker.id() == controllerId)
-                return broker;
-        }
-        return null;
-    }
-
     @Override
     public int throttleTimeMs() {
         return data.throttleTimeMs();
@@ -145,7 +120,6 @@ public class MetadataResponse extends AbstractResponse {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
         for (TopicMetadata metadata : topicMetadata()) {
-
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
@@ -154,13 +128,30 @@ public class MetadataResponse extends AbstractResponse {
                 }
             }
         }
-        return new Cluster(data.clusterId(), brokersMap().values(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
+        return new Cluster(data.clusterId(), brokers(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
                 topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
     }
 
     /**
-     * Transform a topic and PartitionMetadata into PartitionInfo
-     * @return
+     * Returns a 32-bit bitfield to represent authorized operations for this topic.
+     */
+    public Optional<Integer> topicAuthorizedOperations(String topicName) {
+        MetadataResponseTopic topic = data.topics().find(topicName);
+        if (topic == null)
+            return Optional.empty();
+        else
+            return Optional.of(topic.topicAuthorizedOperations());
+    }
+
+    /**
+     * Returns a 32-bit bitfield to represent authorized operations for this cluster.
+     */
+    public int clusterAuthorizedOperations() {
+        return data.clusterAuthorizedOperations();
+    }
+
+    /**
+     * Transform a topic and PartitionMetadata into PartitionInfo.
      */
     public static PartitionInfo partitionMetaToInfo(String topic, PartitionMetadata partitionMetadata) {
         return new PartitionInfo(
@@ -172,12 +163,22 @@ public class MetadataResponse extends AbstractResponse {
                 partitionMetadata.offlineReplicas().toArray(new Node[0]));
     }
 
+    private Holder holder() {
+        if (holder == null) {
+            synchronized (data) {
+                if (holder == null)
+                    holder = new Holder(data);
+            }
+        }
+        return holder;
+    }
+
     /**
      * Get all brokers returned in metadata response
      * @return the brokers
      */
     public Collection<Node> brokers() {
-        return new ArrayList<>(brokersMap().values());
+        return holder().brokers;
     }
 
     /**
@@ -185,30 +186,7 @@ public class MetadataResponse extends AbstractResponse {
      * @return the topicMetadata
      */
     public Collection<TopicMetadata> topicMetadata() {
-        List<TopicMetadata> topicMetadataList = new ArrayList<>();
-        for (MetadataResponseTopic topicMetadata : data.topics()) {
-            Errors topicError = Errors.forCode(topicMetadata.errorCode());
-            String topic = topicMetadata.name();
-            boolean isInternal = topicMetadata.isInternal();
-            List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
-
-            for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
-                Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
-                int partitionIndex = partitionMetadata.partitionIndex();
-                int leader = partitionMetadata.leaderId();
-                Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
-                Node leaderNode = leader == -1 ? null : brokersMap().get(leader);
-                List<Node> replicaNodes = convertToNodes(brokersMap(), partitionMetadata.replicaNodes());
-                List<Node> isrNodes = convertToNodes(brokersMap(), partitionMetadata.isrNodes());
-                List<Node> offlineNodes = convertToNodes(brokersMap(), partitionMetadata.offlineReplicas());
-                partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
-                    replicaNodes, isrNodes, offlineNodes));
-            }
-
-            topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
-                topicMetadata.topicAuthorizedOperations()));
-        }
-        return  topicMetadataList;
+        return holder().topicMetadata;
     }
 
     /**
@@ -216,7 +194,7 @@ public class MetadataResponse extends AbstractResponse {
      * @return the controller node or null if it doesn't exist
      */
     public Node controller() {
-        return getControllerNode(data.controllerId(), brokers());
+        return holder().controller;
     }
 
     /**
@@ -381,18 +359,76 @@ public class MetadataResponse extends AbstractResponse {
         }
     }
 
-    public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
+    private static class Holder {
+        private final Collection<Node> brokers;
+        private final Node controller;
+        private final Collection<TopicMetadata> topicMetadata;
+
+        Holder(MetadataResponseData data) {
+            this.brokers = Collections.unmodifiableCollection(createBrokers(data));
+            Map<Integer, Node> brokerMap = brokers.stream().collect(Collectors.toMap(Node::id, b -> b));
+            this.topicMetadata = createTopicMetadata(data, brokerMap);
+            this.controller = brokerMap.get(data.controllerId());
+        }
+
+        private Collection<Node> createBrokers(MetadataResponseData data) {
+            return data.brokers().valuesList().stream().map(b ->
+                    new Node(b.nodeId(), b.host(), b.port(), b.rack())).collect(Collectors.toList());
+        }
+
+        private Collection<TopicMetadata> createTopicMetadata(MetadataResponseData data, Map<Integer, Node> brokerMap) {
+            List<TopicMetadata> topicMetadataList = new ArrayList<>();
+            for (MetadataResponseTopic topicMetadata : data.topics()) {
+                Errors topicError = Errors.forCode(topicMetadata.errorCode());
+                String topic = topicMetadata.name();
+                boolean isInternal = topicMetadata.isInternal();
+                List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
+
+                for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
+                    Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
+                    int partitionIndex = partitionMetadata.partitionIndex();
+                    int leader = partitionMetadata.leaderId();
+                    Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
+                    Node leaderNode = leader == -1 ? null : brokerMap.get(leader);
+                    List<Node> replicaNodes = convertToNodes(brokerMap, partitionMetadata.replicaNodes());
+                    List<Node> isrNodes = convertToNodes(brokerMap, partitionMetadata.isrNodes());
+                    List<Node> offlineNodes = convertToNodes(brokerMap, partitionMetadata.offlineReplicas());
+                    partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
+                            replicaNodes, isrNodes, offlineNodes));
+                }
+
+                topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
+                        topicMetadata.topicAuthorizedOperations()));
+            }
+            return topicMetadataList;
+        }
+
+        private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
+            List<Node> nodes = new ArrayList<>(brokerIds.size());
+            for (Integer brokerId : brokerIds) {
+                Node node = brokers.get(brokerId);
+                if (node == null)
+                    nodes.add(new Node(brokerId, "", -1));
+                else
+                    nodes.add(node);
+            }
+            return nodes;
+        }
+
+    }
+
+    public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
                                                    int controllerId, List<TopicMetadata> topicMetadataList,
                                                    int clusterAuthorizedOperations) {
         MetadataResponseData responseData = new MetadataResponseData();
         responseData.setThrottleTimeMs(throttleTimeMs);
-        brokers.forEach(broker -> {
+        brokers.forEach(broker ->
             responseData.brokers().add(new MetadataResponseBroker()
                 .setNodeId(broker.id())
                 .setHost(broker.host())
                 .setPort(broker.port())
-                .setRack(broker.rack()));
-        });
+                .setRack(broker.rack()))
+        );
 
         responseData.setClusterId(clusterId);
         responseData.setControllerId(controllerId);
@@ -421,13 +457,13 @@ public class MetadataResponse extends AbstractResponse {
         return new MetadataResponse(responseData);
     }
 
-    public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
+    public static MetadataResponse prepareResponse(int throttleTimeMs, Collection<Node> brokers, String clusterId,
                                                    int controllerId, List<TopicMetadata> topicMetadataList) {
         return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList,
                 MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
     }
 
-    public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,
+    public static MetadataResponse prepareResponse(Collection<Node> brokers, String clusterId, int controllerId,
                                                    List<TopicMetadata> topicMetadata) {
         return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 44c00c4..30d3b72 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1774,7 +1774,7 @@ public class FetcherTest {
         }
         Node controller = originalResponse.controller();
         MetadataResponse altered = MetadataResponse.prepareResponse(
-            (List<Node>) originalResponse.brokers(),
+            originalResponse.brokers(),
             originalResponse.clusterId(),
             controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
             altTopics);
diff --git a/clients/src/test/java/org/apache/kafka/common/ClusterTest.java b/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
index 0a7049b..2c80d08 100644
--- a/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/ClusterTest.java
@@ -22,19 +22,35 @@ import org.junit.Test;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class ClusterTest {
 
+    private final static Node[] NODES = new Node[] {
+        new Node(0, "localhost", 99),
+        new Node(1, "localhost", 100),
+        new Node(2, "localhost", 101),
+        new Node(11, "localhost", 102)
+    };
+
+    private final static String TOPIC_A = "topicA";
+    private final static String TOPIC_B = "topicB";
+    private final static String TOPIC_C = "topicC";
+    private final static String TOPIC_D = "topicD";
+    private final static String TOPIC_E = "topicE";
+
     @Test
     public void testBootstrap() {
         String ipAddress = "140.211.11.105";
         String hostName = "www.example.com";
         Cluster cluster = Cluster.bootstrap(Arrays.asList(
-                new InetSocketAddress(ipAddress, 9002),
-                new InetSocketAddress(hostName, 9002)
+            new InetSocketAddress(ipAddress, 9002),
+            new InetSocketAddress(hostName, 9002)
         ));
         Set<String> expectedHosts = Utils.mkSet(ipAddress, hostName);
         Set<String> actualHosts = new HashSet<>();
@@ -43,4 +59,34 @@ public class ClusterTest {
         assertEquals(expectedHosts, actualHosts);
     }
 
+    @Test
+    public void testReturnUnmodifiableCollections() {
+        List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
+            new PartitionInfo(TOPIC_A, 1, null, NODES, NODES),
+            new PartitionInfo(TOPIC_A, 2, NODES[2], NODES, NODES),
+            new PartitionInfo(TOPIC_B, 0, null, NODES, NODES),
+            new PartitionInfo(TOPIC_B, 1, NODES[0], NODES, NODES),
+            new PartitionInfo(TOPIC_C, 0, null, NODES, NODES),
+            new PartitionInfo(TOPIC_D, 0, NODES[1], NODES, NODES),
+            new PartitionInfo(TOPIC_E, 0, NODES[0], NODES, NODES)
+        );
+        Set<String> unauthorizedTopics = Utils.mkSet(TOPIC_C);
+        Set<String> invalidTopics = Utils.mkSet(TOPIC_D);
+        Set<String> internalTopics = Utils.mkSet(TOPIC_E);
+        Cluster cluster = new Cluster("clusterId", asList(NODES), allPartitions, unauthorizedTopics,
+            invalidTopics, internalTopics, NODES[1]);
+
+        assertThrows(UnsupportedOperationException.class, () -> cluster.invalidTopics().add("foo"));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.internalTopics().add("foo"));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.unauthorizedTopics().add("foo"));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.topics().add("foo"));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.nodes().add(NODES[3]));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.partitionsForTopic(TOPIC_A).add(
+            new PartitionInfo(TOPIC_A, 3, NODES[0], NODES, NODES)));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.availablePartitionsForTopic(TOPIC_B).add(
+            new PartitionInfo(TOPIC_B, 2, NODES[0], NODES, NODES)));
+        assertThrows(UnsupportedOperationException.class, () -> cluster.partitionsForNode(NODES[1].id()).add(
+            new PartitionInfo(TOPIC_B, 2, NODES[1], NODES, NODES)));
+    }
+
 }