You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/30 20:21:01 UTC

[jira] [Commented] (KAFKA-7567) Clean up internal metadata usage for consistency and extensibility

    [ https://issues.apache.org/jira/browse/KAFKA-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16669283#comment-16669283 ] 

ASF GitHub Bot commented on KAFKA-7567:
---------------------------------------

hachikuji closed pull request #5813: KAFKA-7567; Clean up internal metadata usage for consistency and extensibility
URL: https://github.com/apache/kafka/pull/5813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index fe83c5c54e9..4d933243655 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -45,9 +45,12 @@
     private ClientUtils() {
     }
 
-    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookupConfig) {
+        return parseAndValidateAddresses(urls, ClientDnsLookup.forConfig(clientDnsLookupConfig));
+    }
+
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) {
         List<InetSocketAddress> addresses = new ArrayList<>();
-        ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
         for (String url : urls) {
             if (url != null && !url.isEmpty()) {
                 try {
@@ -56,7 +59,7 @@ private ClientUtils() {
                     if (host == null || port == null)
                         throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
 
-                    if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+                    if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
                         InetAddress[] inetAddresses = InetAddress.getAllByName(host);
                         for (InetAddress inetAddress : inetAddresses) {
                             String resolvedCanonicalName = inetAddress.getCanonicalHostName();
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0abb5c45ba6..1028de755a2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -18,15 +18,15 @@
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,6 +64,7 @@
     private long lastSuccessfulRefreshMs;
     private AuthenticationException authenticationException;
     private Cluster cluster;
+    private Set<String> unavailableTopics = Collections.emptySet();
     private boolean needUpdate;
     /* Topics with expiry time */
     private final Map<String, Long> topics;
@@ -74,7 +75,9 @@
     private final boolean topicExpiryEnabled;
     private boolean isClosed;
 
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
+    public Metadata(long refreshBackoffMs,
+                    long metadataExpireMs,
+                    boolean allowAutoTopicCreation) {
         this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
     }
 
@@ -88,8 +91,11 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoT
      * @param topicExpiryEnabled If true, enable expiry of unused topics
      * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
      */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
-                    boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
+    public Metadata(long refreshBackoffMs,
+                    long metadataExpireMs,
+                    boolean allowAutoTopicCreation,
+                    boolean topicExpiryEnabled,
+                    ClusterResourceListeners clusterResourceListeners) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.allowAutoTopicCreation = allowAutoTopicCreation;
@@ -231,17 +237,23 @@ public synchronized boolean containsTopic(String topic) {
         return this.topics.containsKey(topic);
     }
 
+    public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
+        this.needUpdate = true;
+        this.lastRefreshMs = now;
+        this.lastSuccessfulRefreshMs = now;
+        this.version += 1;
+        this.cluster = Cluster.bootstrap(addresses);
+    }
+
     /**
      * Updates the cluster metadata. If topic expiry is enabled, expiry time
      * is set for topics if required and expired topics are removed from the metadata.
      *
-     * @param newCluster the cluster containing metadata for topics with valid metadata
-     * @param unavailableTopics topics which are non-existent or have one or more partitions whose
-     *        leader is not known
+     * @param metadataResponse metadata response received from the broker
      * @param now current time in milliseconds
      */
-    public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
-        Objects.requireNonNull(newCluster, "cluster should not be null");
+    public synchronized void update(MetadataResponse metadataResponse, long now) {
+        Objects.requireNonNull(metadataResponse, "Metadata response cannot be null");
         if (isClosed())
             throw new IllegalStateException("Update requested after metadata close");
 
@@ -264,32 +276,34 @@ else if (expireMs <= now) {
             }
         }
 
-        for (Listener listener: listeners)
-            listener.onMetadataUpdate(newCluster, unavailableTopics);
-
         String previousClusterId = cluster.clusterResource().clusterId();
 
+        this.cluster = metadataResponse.cluster();
+        this.unavailableTopics = metadataResponse.unavailableTopics();
+
+        fireListeners(cluster, unavailableTopics);
+
         if (this.needMetadataForAllTopics) {
             // the listener may change the interested topics, which could cause another metadata refresh.
             // If we have already fetched all topics, however, another fetch should be unnecessary.
             this.needUpdate = false;
-            this.cluster = getClusterForCurrentTopics(newCluster);
-        } else {
-            this.cluster = newCluster;
+            this.cluster = metadataResponse.cluster(topics.keySet());
         }
 
-        // The bootstrap cluster is guaranteed not to have any useful information
-        if (!newCluster.isBootstrapConfigured()) {
-            String newClusterId = newCluster.clusterResource().clusterId();
-            if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
-                log.info("Cluster ID: {}", newClusterId);
-            clusterResourceListeners.onUpdate(newCluster.clusterResource());
-        }
+        String newClusterId = cluster.clusterResource().clusterId();
+        if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
+            log.info("Cluster ID: {}", newClusterId);
+        clusterResourceListeners.onUpdate(cluster.clusterResource());
 
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
     }
 
+    private void fireListeners(Cluster newCluster, Set<String> unavailableTopics) {
+        for (Listener listener: listeners)
+            listener.onMetadataUpdate(newCluster, unavailableTopics);
+    }
+
     /**
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
@@ -390,32 +404,4 @@ private synchronized void requestUpdateForNewTopics() {
         requestUpdate();
     }
 
-    private Cluster getClusterForCurrentTopics(Cluster cluster) {
-        Set<String> unauthorizedTopics = new HashSet<>();
-        Set<String> invalidTopics = new HashSet<>();
-        Collection<PartitionInfo> partitionInfos = new ArrayList<>();
-        List<Node> nodes = Collections.emptyList();
-        Set<String> internalTopics = Collections.emptySet();
-        Node controller = null;
-        String clusterId = null;
-        if (cluster != null) {
-            clusterId = cluster.clusterResource().clusterId();
-            internalTopics = cluster.internalTopics();
-            unauthorizedTopics.addAll(cluster.unauthorizedTopics());
-            unauthorizedTopics.retainAll(this.topics.keySet());
-
-            invalidTopics.addAll(cluster.invalidTopics());
-            invalidTopics.addAll(this.cluster.invalidTopics());
-
-            for (String topic : this.topics.keySet()) {
-                List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
-                if (!partitionInfoList.isEmpty()) {
-                    partitionInfos.addAll(partitionInfoList);
-                }
-            }
-            nodes = cluster.nodes();
-            controller  = cluster.controller();
-        }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c6f0c0b3fb6..902ef1c3fda 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -962,7 +962,6 @@ public void handleAuthenticationFailure(AuthenticationException exception) {
         @Override
         public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
-            Cluster cluster = response.cluster();
 
             // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
             // issues. This could be a transient issue if listeners were added dynamically to brokers.
@@ -984,11 +983,11 @@ public void handleCompletedMetadataResponse(RequestHeader requestHeader, long no
 
             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
             // created which means we will get errors and no nodes until it exists
-            if (cluster.nodes().size() > 0) {
-                this.metadata.update(cluster, response.unavailableTopics(), now);
-            } else {
+            if (response.brokers().isEmpty()) {
                 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now, null);
+            } else {
+                this.metadata.update(response, now);
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 33a47868714..39817df4a67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -339,6 +339,10 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
             AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
+            metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class,
                 Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
@@ -375,25 +379,25 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso
             closeQuietly(networkClient, "NetworkClient");
             closeQuietly(selector, "Selector");
             closeQuietly(channelBuilder, "ChannelBuilder");
-            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+            throw new KafkaException("Failed to create new KafkaAdminClient", exc);
         }
     }
 
-    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Time time) {
+    static KafkaAdminClient createInternal(AdminClientConfig config,
+                                           AdminMetadataManager metadataManager,
+                                           KafkaClient client,
+                                           Time time) {
         Metrics metrics = null;
         String clientId = generateClientId(config);
 
         try {
             metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
             LogContext logContext = createLogContext(clientId);
-            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
-                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
                 client, null, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
-            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+            throw new KafkaException("Failed to create new KafkaAdminClient", exc);
         }
     }
 
@@ -414,10 +418,6 @@ private KafkaAdminClient(AdminClientConfig config,
         this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
         this.metadataManager = metadataManager;
-        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
-            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
-            config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
-        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
@@ -1399,12 +1399,12 @@ public ListTopicsResult listTopics(final ListTopicsOptions options) {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse response = (MetadataResponse) abstractResponse;
-                Cluster cluster = response.cluster();
                 Map<String, TopicListing> topicListing = new HashMap<>();
-                for (String topicName : cluster.topics()) {
-                    boolean internal = cluster.internalTopics().contains(topicName);
-                    if (!internal || options.shouldListInternal())
-                        topicListing.put(topicName, new TopicListing(topicName, internal));
+                for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
+                    String topicName = topicMetadata.topic();
+                    boolean isInternal = topicMetadata.isInternal();
+                    if (!topicMetadata.isInternal() || options.shouldListInternal())
+                        topicListing.put(topicName, new TopicListing(topicName, isInternal));
                 }
                 topicListingFuture.complete(topicListing);
             }
@@ -2551,12 +2551,11 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
-                Cluster cluster = metadataResponse.cluster();
-
-                if (cluster.nodes().isEmpty())
+                Collection<Node> nodes = metadataResponse.brokers();
+                if (nodes.isEmpty())
                     throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
 
-                HashSet<Node> allNodes = new HashSet<>(cluster.nodes());
+                HashSet<Node> allNodes = new HashSet<>(nodes);
                 final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all);
 
                 for (final Node node : allNodes) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 714cd94d6b2..065e6631df4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -712,7 +712,7 @@ private KafkaConsumer(ConsumerConfig config,
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+            this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e91677113d2..c9052e3b51f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -416,7 +416,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
             } else {
                 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                     true, true, clusterResourceListeners);
-                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
+                this.metadata.bootstrap(addresses, time.milliseconds());
             }
             this.errors = this.metrics.sensor("errors");
             this.sender = newSender(logContext, kafkaClient, this.metadata);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 24a18db0d9f..8a773bc31b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -79,12 +79,12 @@ public Cluster(String clusterId,
      * @param partitions Information about a subset of the topic-partitions this cluster hosts
      */
     public Cluster(String clusterId,
-        Collection<Node> nodes,
-        Collection<PartitionInfo> partitions,
-        Set<String> unauthorizedTopics,
-        Set<String> invalidTopics,
-        Set<String> internalTopics,
-        Node controller) {
+                   Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics,
+                   Set<String> invalidTopics,
+                   Set<String> internalTopics,
+                   Node controller) {
         this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 44cd4f4d215..7ed54f098a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -103,10 +103,12 @@ public String toString() {
     /* Extract the node ids from each item in the array and format for display */
     private String formatNodeIds(Node[] nodes) {
         StringBuilder b = new StringBuilder("[");
-        for (int i = 0; i < nodes.length; i++) {
-            b.append(nodes[i].idString());
-            if (i < nodes.length - 1)
-                b.append(',');
+        if (nodes != null) {
+            for (int i = 0; i < nodes.length; i++) {
+                b.append(nodes[i].idString());
+                if (i < nodes.length - 1)
+                    b.append(',');
+            }
         }
         b.append("]");
         return b.toString();
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index a5ef3357dd1..619477d3260 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,7 +28,7 @@
     public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
     public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
 
-    private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
+    public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
             Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
 
     private static final int MAX_NAME_LENGTH = 249;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c78066f93bb..fc67571f1ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -368,9 +368,16 @@ public int throttleTimeMs() {
      * @return the cluster snapshot
      */
     public Cluster cluster() {
+        return cluster(null);
+    }
+
+    public Cluster cluster(Set<String> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
         for (TopicMetadata metadata : topicMetadata) {
+            if (topicsToRetain != null && !topicsToRetain.contains(metadata.topic))
+                continue;
+
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 227afea7e6f..afe5a5d1e7d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -112,11 +112,11 @@ public void testResolveDnsLookupAllIps() throws UnknownHostException {
     }
 
     private List<InetSocketAddress> checkWithoutLookup(String... url) {
-        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
+        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
     }
 
     private List<InetSocketAddress> checkWithLookup(List<String> url) {
-        return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
+        return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY);
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 969921eceeb..b8c9d64577a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,25 +16,26 @@
  */
 package org.apache.kafka.clients;
 
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.test.MockClusterResourceListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
 
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,10 +55,18 @@ public void tearDown() {
         assertNull("Exception in background thread : " + backgroundError.get(), backgroundError.get());
     }
 
+    private static MetadataResponse emptyMetadataResponse() {
+        return new MetadataResponse(
+                Collections.emptyList(),
+                null,
+                -1,
+                Collections.emptyList());
+    }
+    
     @Test
     public void testMetadata() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -72,7 +81,8 @@ public void testMetadata() throws Exception {
         // This simulates the metadata update sequence in KafkaProducer
         while (t1.isAlive() || t2.isAlive()) {
             if (metadata.timeToNextUpdate(time) == 0) {
-                metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time);
+                MetadataResponse response = TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
+                metadata.update(response, time);
                 time += refreshBackoffMs;
             }
             Thread.sleep(1);
@@ -87,7 +97,7 @@ public void testMetadata() throws Exception {
     @Test
     public void testMetadataAwaitAfterClose() throws InterruptedException {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -105,7 +115,7 @@ public void testMetadataAwaitAfterClose() throws InterruptedException {
     @Test(expected = IllegalStateException.class)
     public void testMetadataUpdateAfterClose() {
         metadata.close();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+        metadata.update(emptyMetadataResponse(), 1000);
     }
 
     private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {
@@ -126,7 +136,7 @@ private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataEx
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // lastSuccessfulRefreshMs updated to now.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
 
         // The last update was successful so the remaining time to expire the current metadata should be returned.
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
@@ -137,7 +147,7 @@ private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataEx
         assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
 
         // Reset needUpdate to false.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
 
         // Both metadataExpireMs and refreshBackoffMs elapsed.
@@ -181,13 +191,13 @@ public void testTimeToNextUpdate_OverwriteBackoff() {
         long now = 10000;
 
         // New topic added to fetch set and update requested. It should allow immediate update.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.add("new-topic");
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // Even though setTopics called, immediate update isn't necessary if the new topic set isn't
         // containing a new topic,
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.setTopics(metadata.topics());
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
 
@@ -196,12 +206,12 @@ public void testTimeToNextUpdate_OverwriteBackoff() {
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // If metadata requested for all topics it should allow immediate update.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // However if metadata is already capable to serve all topics it shouldn't override backoff.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
     }
@@ -216,7 +226,7 @@ public void testTimeToNextUpdate_OverwriteBackoff() {
     @Test
     public void testMetadataUpdateWaitTime() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         // first try with a max wait time of 0 and ensure that this returns back without waiting forever
         try {
@@ -238,7 +248,7 @@ public void testMetadataUpdateWaitTime() throws Exception {
     @Test
     public void testFailedUpdate() {
         long time = 100;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
 
         assertEquals(100, metadata.timeToNextUpdate(1000));
         metadata.failedUpdate(1100, null);
@@ -247,26 +257,24 @@ public void testFailedUpdate() {
         assertEquals(100, metadata.lastSuccessfulUpdate());
 
         metadata.needMetadataForAllTopics(true);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertEquals(100, metadata.timeToNextUpdate(1000));
     }
 
     @Test
     public void testUpdateWithNeedMetadataForAllTopics() {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         metadata.needMetadataForAllTopics(true);
 
         final List<String> expectedTopics = Collections.singletonList("topic");
         metadata.setTopics(expectedTopics);
-        metadata.update(new Cluster(null,
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertArrayEquals("Metadata got updated with wrong set of topics.",
             expectedTopics.toArray(), metadata.topics().toArray());
@@ -283,20 +291,15 @@ public void testClusterListenerGetsNotifiedOfUpdate() {
         metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners);
 
         String hostName = "www.example.com";
-        Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
-        metadata.update(cluster, Collections.<String>emptySet(), time);
+        metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
         assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
                 MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
 
-        metadata.update(new Cluster(
-                        "dummy",
-                        Arrays.asList(new Node(0, "host1", 1000)),
-                        Arrays.asList(
-                                new PartitionInfo("topic", 0, null, null, null),
-                                new PartitionInfo("topic1", 0, null, null, null)),
-                        Collections.<String>emptySet(),
-                        Collections.<String>emptySet()),
-                Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
                 "dummy", mockClusterListener.clusterResource().clusterId());
@@ -308,7 +311,7 @@ public void testClusterListenerGetsNotifiedOfUpdate() {
     public void testListenerGetsNotifiedOfUpdate() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         metadata.addListener(new Metadata.Listener() {
             @Override
             public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -317,15 +320,11 @@ public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
             }
         });
 
-        metadata.update(new Cluster(
-                null,
-                Arrays.asList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -335,7 +334,7 @@ public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
     public void testListenerCanUnregister() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         final Metadata.Listener listener = new Metadata.Listener() {
             @Override
             public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -345,27 +344,19 @@ public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
         };
         metadata.addListener(listener);
 
-        metadata.update(new Cluster(
-                "cluster",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         metadata.removeListener(listener);
 
-        metadata.update(new Cluster(
-                "cluster",
-                Arrays.asList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic2", 0, null, null, null),
-                    new PartitionInfo("topic3", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        partitionCounts.clear();
+        partitionCounts.put("topic2", 1);
+        partitionCounts.put("topic3", 1);
+        metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -378,17 +369,17 @@ public void testTopicExpiry() throws Exception {
         // Test that topic is expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+            metadata.update(emptyMetadataResponse(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -397,9 +388,9 @@ public void testTopicExpiry() throws Exception {
         HashSet<String> topics = new HashSet<>();
         topics.add("topic4");
         metadata.setTopics(topics);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
     }
 
@@ -410,17 +401,17 @@ public void testNonExpiringMetadata() throws Exception {
         // Test that topic is not expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+            metadata.update(emptyMetadataResponse(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -430,7 +421,7 @@ public void testNonExpiringMetadata() throws Exception {
         topics.add("topic4");
         metadata.setTopics(topics);
         time += metadataExpireMs * 2;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a586af82245..0cfb3be59af 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
@@ -24,12 +23,12 @@
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,6 +37,7 @@
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
 
 /**
  * A mock network client for use testing code
@@ -73,10 +73,7 @@ public FutureResponse(Node node,
 
     private int correlation;
     private final Time time;
-    private final Metadata metadata;
-    private Set<String> unavailableTopics;
-    private Cluster cluster;
-    private Node node = null;
+    private final MockMetadataUpdater metadataUpdater;
     private final Set<String> ready = new HashSet<>();
 
     // Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
@@ -97,14 +94,13 @@ public FutureResponse(Node node,
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
 
-    public MockClient(Time time) {
-        this(time, null);
+    public MockClient(Time time, Metadata metadata) {
+        this(time, new DefaultMockMetadataUpdater(metadata));
     }
 
-    public MockClient(Time time, Metadata metadata) {
+    public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
         this.time = time;
-        this.metadata = metadata;
-        this.unavailableTopics = Collections.emptySet();
+        this.metadataUpdater = metadataUpdater;
         this.blackedOut = new TransientSet<>(time);
         this.unreachable = new TransientSet<>(time);
         this.delayedReady = new TransientSet<>(time);
@@ -280,21 +276,13 @@ private synchronized void maybeAwaitWakeup() {
         checkTimeoutOfPendingRequests(now);
 
         List<ClientResponse> copy = new ArrayList<>(this.responses);
-        if (metadata != null && metadata.updateRequested()) {
+        // We skip metadata updates if all nodes are currently blacked out
+        if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
-            if (cluster != null)
-                metadata.update(cluster, this.unavailableTopics, time.milliseconds());
-            if (metadataUpdate == null)
-                metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
-            else {
-                if (metadataUpdate.expectMatchRefreshTopics
-                    && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
-                    throw new IllegalStateException("The metadata topics does not match expectation. "
-                                                        + "Expected topics: " + metadataUpdate.cluster.topics()
-                                                        + ", asked topics: " + metadata.topics());
-                }
-                this.unavailableTopics = metadataUpdate.unavailableTopics;
-                metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
+            if (metadataUpdate != null) {
+                metadataUpdater.update(time, metadataUpdate);
+            } else {
+                metadataUpdater.updateWithCurrentMetadata(time);
             }
         }
 
@@ -310,6 +298,7 @@ private long elapsedTimeMs(long currentTimeMs, long startTimeMs) {
         return Math.max(0, currentTimeMs - startTimeMs);
     }
 
+
     private void checkTimeoutOfPendingRequests(long nowMs) {
         ClientRequest request = requests.peek();
         while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
@@ -350,9 +339,9 @@ public void respondToRequest(ClientRequest clientRequest, AbstractResponse respo
 
 
     public void respond(AbstractResponse response, boolean disconnected) {
-        ClientRequest request = null;
-        if (requests.size() > 0)
-            request = requests.remove();
+        if (requests.isEmpty())
+            throw new IllegalStateException("No requests pending for inbound response " + response);
+        ClientRequest request = requests.poll();
         short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
@@ -463,22 +452,17 @@ public int numAwaitingResponses() {
         return futureResponses.size();
     }
 
-    public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+    public void prepareMetadataUpdate(MetadataResponse updateResponse) {
+        prepareMetadataUpdate(updateResponse, false);
     }
 
-    public void prepareMetadataUpdate(Cluster cluster,
-                                      Set<String> unavailableTopics,
+    public void prepareMetadataUpdate(MetadataResponse updateResponse,
                                       boolean expectMatchMetadataTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
-    }
-
-    public void setNode(Node node) {
-        this.node = node;
+        metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics));
     }
 
-    public void cluster(Cluster cluster) {
-        this.cluster = cluster;
+    public void updateMetadata(MetadataResponse updateResponse) {
+        metadataUpdater.update(time, new MetadataUpdate(updateResponse, false));
     }
 
     @Override
@@ -534,7 +518,7 @@ public ClientRequest newClientRequest(String nodeId,
 
     @Override
     public void close() {
-        metadata.close();
+        metadataUpdater.close();
     }
 
     @Override
@@ -545,9 +529,11 @@ public void close(String node) {
     @Override
     public Node leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
-        if (blackedOut.contains(node, now))
-            return null;
-        return this.node;
+        for (Node node : metadataUpdater.fetchNodes()) {
+            if (!blackedOut.contains(node, now))
+                return node;
+        }
+        return null;
     }
 
     /**
@@ -564,15 +550,20 @@ public void setNodeApiVersions(NodeApiVersions nodeApiVersions) {
         this.nodeApiVersions = nodeApiVersions;
     }
 
-    private static class MetadataUpdate {
-        final Cluster cluster;
-        final Set<String> unavailableTopics;
+    public static class MetadataUpdate {
+        final MetadataResponse updateResponse;
         final boolean expectMatchRefreshTopics;
-        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
-            this.cluster = cluster;
-            this.unavailableTopics = unavailableTopics;
+
+        MetadataUpdate(MetadataResponse updateResponse, boolean expectMatchRefreshTopics) {
+            this.updateResponse = updateResponse;
             this.expectMatchRefreshTopics = expectMatchRefreshTopics;
         }
+
+        private Set<String> topics() {
+            return updateResponse.topicMetadata().stream()
+                    .map(MetadataResponse.TopicMetadata::topic)
+                    .collect(Collectors.toSet());
+        }
     }
 
     private static class TransientSet<T> {
@@ -614,4 +605,64 @@ void clear() {
 
     }
 
+    /**
+     * This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
+     * metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
+     * and bookkeeping through {@link Metadata}. The extensibility allows AdminClient, which does
+     * not rely on {@link Metadata} to do its own thing.
+     */
+    public interface MockMetadataUpdater {
+        List<Node> fetchNodes();
+
+        boolean isUpdateNeeded();
+
+        void update(Time time, MetadataUpdate update);
+
+        default void updateWithCurrentMetadata(Time time) {}
+
+        default void close() {}
+    }
+
+    private static class DefaultMockMetadataUpdater implements MockMetadataUpdater {
+        private final Metadata metadata;
+        private MetadataUpdate lastUpdate;
+
+        public DefaultMockMetadataUpdater(Metadata metadata) {
+            this.metadata = metadata;
+        }
+
+        @Override
+        public List<Node> fetchNodes() {
+            return metadata.fetch().nodes();
+        }
+
+        @Override
+        public boolean isUpdateNeeded() {
+            return metadata.updateRequested();
+        }
+
+        @Override
+        public void updateWithCurrentMetadata(Time time) {
+            if (lastUpdate == null)
+                throw new IllegalStateException("No previous metadata update to use");
+            update(time, lastUpdate);
+        }
+
+        @Override
+        public void update(Time time, MetadataUpdate update) {
+            if (update.expectMatchRefreshTopics && !metadata.topics().equals(update.topics())) {
+                throw new IllegalStateException("The metadata topics does not match expectation. "
+                        + "Expected topics: " + update.topics()
+                        + ", asked topics: " + metadata.topics());
+            }
+            metadata.update(update.updateResponse, time.milliseconds());
+            this.lastUpdate = update;
+        }
+
+        @Override
+        public void close() {
+            metadata.close();
+        }
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8abe9a40085..5a4e1e8c8c8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -27,6 +26,7 @@
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.LogContext;
@@ -56,9 +56,9 @@
     protected final MockTime time = new MockTime();
     protected final MockSelector selector = new MockSelector(time);
     protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-    protected final int nodeId = 1;
-    protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
-    protected final Node node = cluster.nodes().get(0);
+    protected final MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+            Collections.singletonMap("test", 1));
+    protected final Node node = initialMetadataResponse.brokers().iterator().next();
     protected final long reconnectBackoffMsTest = 10 * 1000;
     protected final long reconnectBackoffMaxMsTest = 10 * 10000;
 
@@ -89,7 +89,7 @@ private NetworkClient createNetworkClientWithNoVersionDiscovery() {
     @Before
     public void setup() {
         selector.reset();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(initialMetadataResponse, time.milliseconds());
     }
 
     @Test(expected = IllegalStateException.class)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index e77b48b4cf3..89f5fdec2a5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -17,12 +17,14 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -42,38 +44,45 @@
     private final MockClient mockClient;
     private final KafkaAdminClient adminClient;
 
-    public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Cluster cluster, String... vals) {
         this(Time.SYSTEM, cluster, vals);
     }
 
-    public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
         this(time, cluster, newStrMap(vals));
     }
 
-    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
-        this(newMockClient(time, cluster), time, cluster, config);
-    }
-
-    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster) {
-        this(mockClient, time, cluster, newStrMap());
-    }
-
-    private static MockClient newMockClient(Time time, Cluster cluster) {
-        MockClient mockClient = new MockClient(time);
-        mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
-            cluster.clusterResource().clusterId(),
-            cluster.controller() == null ? MetadataResponse.NO_CONTROLLER_ID : cluster.controller().id(),
-            Collections.emptyList()));
-        return mockClient;
+    public AdminClientUnitTestEnv(Time time, Cluster cluster) {
+        this(time, cluster, newStrMap());
     }
 
-    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster,
-                                  Map<String, Object> config) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
-        this.mockClient = mockClient;
-        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, time);
+
+        AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
+                adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+        this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
+            @Override
+            public List<Node> fetchNodes() {
+                return cluster.nodes();
+            }
+
+            @Override
+            public boolean isUpdateNeeded() {
+                return false;
+            }
+
+            @Override
+            public void update(Time time, MockClient.MetadataUpdate update) {
+                throw new UnsupportedOperationException();
+            }
+        });
+
+        metadataManager.update(cluster, time.milliseconds());
+        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager, mockClient, time);
     }
 
     public Time time() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dc55ea236e6..1f62d39a963 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
@@ -191,6 +193,11 @@ private static Cluster mockCluster(int controllerIndex) {
                 Collections.emptySet(), nodes.get(controllerIndex));
     }
 
+    private static Cluster mockBootstrapCluster() {
+        return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
+                Collections.singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
+    }
+
     private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
         return new AdminClientUnitTestEnv(mockCluster(0), configVals);
     }
@@ -206,15 +213,9 @@ public void testCloseAdminClient() {
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
-                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
-                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -228,16 +229,13 @@ public void testConnectionFailureOnMetadataUpdate() throws Exception {
         // This tests the scenario in which we successfully connect to the bootstrap server, but
         // the server disconnects before sending the full response
 
-        Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.setNodeApiVersions(NodeApiVersions.create());
-        mockClient.setNode(cluster.nodes().get(0));
-
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+        Cluster cluster = mockBootstrapCluster();
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
             Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
-            env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
-                    new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
+            env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
+                    new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
                     new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
@@ -256,13 +254,10 @@ public void testUnreachableBootstrapServer() throws Exception {
         // which prevents AdminClient from being able to send the initial metadata request
 
         Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.setNodeApiVersions(NodeApiVersions.create());
-        mockClient.setNode(cluster.nodes().get(0));
-        mockClient.setUnreachable(cluster.nodes().get(0), 200);
-
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
             Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
             env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
                     new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
@@ -283,16 +278,12 @@ public void testUnreachableBootstrapServer() throws Exception {
     @Test
     public void testPropagatedMetadataFetchException() throws Exception {
         Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
-            TimeUnit.DAYS.toMillis(1));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
                 newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
+                    TimeUnit.DAYS.toMillis(1));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                 Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -305,7 +296,6 @@ public void testPropagatedMetadataFetchException() throws Exception {
     public void testCreateTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
                     new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
@@ -319,30 +309,26 @@ public void testCreateTopics() throws Exception {
     public void testCreateTopicsRetryBackoff() throws Exception {
         Cluster cluster = mockCluster(0);
         MockTime time = new MockTime();
-        MockClient mockClient = new MockClient(time);
         int retryBackoff = 100;
 
-        mockClient.prepareResponse(body -> body instanceof MetadataRequest,
-                new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(),
-                        1, Collections.emptyList()));
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+                newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
+            MockClient mockClient = env.kafkaClient();
 
-        AtomicLong firstAttemptTime = new AtomicLong(0);
-        AtomicLong secondAttemptTime = new AtomicLong(0);
+            mockClient.setNodeApiVersions(NodeApiVersions.create());
 
-        mockClient.prepareResponse(body -> {
-            firstAttemptTime.set(time.milliseconds());
-            return body instanceof CreateTopicsRequest;
-        }, null, true);
+            AtomicLong firstAttemptTime = new AtomicLong(0);
+            AtomicLong secondAttemptTime = new AtomicLong(0);
 
-        mockClient.prepareResponse(body -> {
-            secondAttemptTime.set(time.milliseconds());
-            return body instanceof CreateTopicsRequest;
-        }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            mockClient.prepareResponse(body -> {
+                firstAttemptTime.set(time.milliseconds());
+                return body instanceof CreateTopicsRequest;
+            }, null, true);
 
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, time, cluster,
-                newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
-            mockClient.setNodeApiVersions(NodeApiVersions.create());
-            mockClient.setNode(env.cluster().controller());
+            mockClient.prepareResponse(body -> {
+                secondAttemptTime.set(time.milliseconds());
+                return body instanceof CreateTopicsRequest;
+            }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
 
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -354,18 +340,17 @@ public void testCreateTopicsRetryBackoff() throws Exception {
             time.sleep(retryBackoff);
 
             future.get();
-        }
 
-        long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-        assertEquals("CreateTopics retry did not await expected backoff",
-                retryBackoff, actualRetryBackoff);
+            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
+            assertEquals("CreateTopics retry did not await expected backoff",
+                    retryBackoff, actualRetryBackoff);
+        }
     }
 
     @Test
     public void testCreateTopicsHandleNotControllerException() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodeById(0));
             env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
                 Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
                 env.cluster().nodeById(0));
@@ -387,7 +372,6 @@ public void testCreateTopicsHandleNotControllerException() throws Exception {
     public void testDeleteTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
                     new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
@@ -413,7 +397,6 @@ public void testDeleteTopics() throws Exception {
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             List<String> sillyTopicNames = asList("", null);
             Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
@@ -447,12 +430,10 @@ public void testMetadataRetries() throws Exception {
         // We should continue retrying on metadata update failures in spite of retry configuration
 
         String topic = "topic";
-        MockClient mockClient = new MockClient(Time.SYSTEM);
         Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
         Cluster initializedCluster = mockCluster(0);
-        mockClient.setNode(bootstrapCluster.nodes().get(0));
 
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, bootstrapCluster,
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
                 newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
                         AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
                         AdminClientConfig.RETRIES_CONFIG, "0"))) {
@@ -484,17 +465,12 @@ public void testMetadataRetries() throws Exception {
 
     @Test
     public void testAdminClientApisAuthenticationFailure() throws Exception {
-        Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
-            TimeUnit.DAYS.toMillis(1));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
-                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
-                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+        Cluster cluster = mockBootstrapCluster();
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
+            env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0),
+                    TimeUnit.DAYS.toMillis(1));
             callAdminClientApisAndExpectAnAuthenticationError(env);
         }
     }
@@ -570,7 +546,6 @@ private void callAdminClientApisAndExpectAnAuthenticationError(AdminClientUnitTe
     public void testDescribeAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
@@ -597,7 +572,6 @@ public void testDescribeAcls() throws Exception {
     public void testCreateAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we successfully create two ACLs.
             env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
@@ -625,7 +599,6 @@ public void testCreateAcls() throws Exception {
     public void testDeleteAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -677,7 +650,6 @@ public void testHandleTimeout() throws Exception {
             AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
                 AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(nodes.get(0));
             assertEquals(time, env.time());
             assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
 
@@ -718,7 +690,6 @@ public boolean conditionMet() {
     public void testDescribeConfigs() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
                 Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"),
                     new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptySet()))));
@@ -732,7 +703,6 @@ public void testDescribeConfigs() throws Exception {
     public void testCreatePartitions() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             Map<String, ApiError> m = new HashMap<>();
             m.put("my_topic", ApiError.NONE);
@@ -784,7 +754,6 @@ public void testDeleteRecords() throws Exception {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodes().get(0));
 
             Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> m = new HashMap<>();
             m.put(myTopicPartition0,
@@ -889,7 +858,6 @@ public void testListConsumerGroups() throws Exception {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata response should be retried
             env.kafkaClient().prepareResponse(
@@ -991,7 +959,6 @@ public void testListConsumerGroupsMetadataFailure() throws Exception {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
                 AdminClientConfig.RETRIES_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata causes the request to fail since we have no list of brokers
             // to send the ListGroups requests to
@@ -1022,7 +989,6 @@ public void testDescribeConsumerGroups() throws Exception {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
@@ -1085,7 +1051,6 @@ public void testDescribeConsumerGroupOffsets() throws Exception {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
@@ -1129,7 +1094,6 @@ public void testDeleteConsumerGroups() throws Exception {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             //Retriable FindCoordinatorResponse errors should be retried
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4ac5876c5f0..89fca844262 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,7 +32,6 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
@@ -59,6 +58,7 @@
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -361,14 +361,12 @@ public void testPause() {
     @Test
     public void verifyHeartbeatSent() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -396,14 +394,11 @@ public void verifyHeartbeatSent() throws Exception {
     @Test
     public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -430,16 +425,14 @@ public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
     }
 
     @Test
-    public void verifyPollTimesOutDuringMetadataUpdate() throws Exception {
+    public void verifyPollTimesOutDuringMetadataUpdate() {
         final Time time = new MockTime();
-        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        final Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -454,16 +447,14 @@ public void verifyPollTimesOutDuringMetadataUpdate() throws Exception {
     }
 
     @Test
-    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exception {
+    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
         final Time time = new MockTime();
-        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        final Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -483,14 +474,10 @@ public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exce
     @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -514,14 +501,9 @@ public void testFetchProgressWithMissingPartitionPosition() {
         // a reset on another partition.
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+        initMetadata(client, Collections.singletonMap(topic, 2));
 
         KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, metadata);
         consumer.assign(Arrays.asList(tp0, tp1));
@@ -555,17 +537,20 @@ public boolean matches(AbstractRequest body) {
         assertEquals(singleton(tp0), records.partitions());
     }
 
+    private void initMetadata(MockClient mockClient, Map<String, Integer> partitionCounts) {
+        MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, partitionCounts);
+        mockClient.updateMetadata(initialMetadata);
+    }
+
     @Test(expected = NoOffsetForPartitionException.class)
     public void testMissingOffsetNoResetPolicy() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -583,14 +568,12 @@ public void testMissingOffsetNoResetPolicy() {
     @Test
     public void testResetToCommittedOffset() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -609,14 +592,12 @@ public void testResetToCommittedOffset() {
     @Test
     public void testResetUsingAutoResetPolicy() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -640,14 +621,12 @@ public void testCommitsFetchedDuringAssign() {
         long offset2 = 20000;
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -679,14 +658,12 @@ public void testCommitsFetchedDuringAssign() {
     @Test
     public void testAutoCommitSentBeforePositionUpdate() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -717,17 +694,15 @@ public void testAutoCommitSentBeforePositionUpdate() {
     public void testRegexSubscription() {
         String unmatchedTopic = "unmatched";
         Time time = new MockTime();
-
-        Map<String, Integer> topicMetadata = new HashMap<>();
-        topicMetadata.put(topic, 1);
-        topicMetadata.put(unmatchedTopic, 1);
-
-        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
         Metadata metadata = createMetadata();
-        Node node = cluster.nodes().get(0);
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic, 1);
+        partitionCounts.put(unmatchedTopic, 1);
+        initMetadata(client, partitionCounts);
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -735,7 +710,7 @@ public void testRegexSubscription() {
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
 
         consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -752,20 +727,14 @@ public void testChangingRegexSubscription() {
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
 
         Time time = new MockTime();
-
-        Map<String, Integer> topicMetadata = new HashMap<>();
-        topicMetadata.put(topic, 1);
-        topicMetadata.put(otherTopic, 1);
-
-        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
         Metadata metadata = createMetadata();
-        Node node = cluster.nodes().get(0);
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
-        client.cluster(cluster);
 
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic, 1);
+        partitionCounts.put(otherTopic, 1);
+        initMetadata(client, partitionCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
 
@@ -779,6 +748,7 @@ public void testChangingRegexSubscription() {
 
         consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
 
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
         prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
         consumer.poll(Duration.ZERO);
 
@@ -789,14 +759,12 @@ public void testChangingRegexSubscription() {
     @Test
     public void testWakeupWithFetchDataAvailable() throws Exception {
         final Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -838,16 +806,14 @@ public void run() {
     }
 
     @Test
-    public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
+    public void testPollThrowsInterruptExceptionIfInterrupted() {
         final Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        final Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -872,14 +838,12 @@ public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
     @Test
     public void fetchResponseWithUnexpectedPartitionIsIgnored() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -909,18 +873,16 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() {
     @Test
     public void testSubscriptionChangesWithAutoCommitEnabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         tpCounts.put(topic3, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1025,17 +987,15 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
     @Test
     public void testSubscriptionChangesWithAutoCommitDisabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1088,17 +1048,15 @@ public void testSubscriptionChangesWithAutoCommitDisabled() {
     @Test
     public void testManualAssignmentChangeWithAutoCommitEnabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1145,17 +1103,15 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() {
     @Test
     public void testManualAssignmentChangeWithAutoCommitDisabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1203,14 +1159,12 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() {
     @Test
     public void testOffsetOfPausedPartitions() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1334,14 +1288,12 @@ public void testMetricConfigRecordingLevel() {
     @Test
     public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1404,21 +1356,19 @@ private void consumerCloseTest(final long closeTimeoutMs,
                                    long waitMs,
                                    boolean interrupt) throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1)));
 
         consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -1542,16 +1492,12 @@ public void testCommittedAuthenticationFaiure() {
 
     private KafkaConsumer<String, String> consumerWithPendingAuthentication() {
         Time time = new MockTime();
-        Map<String, Integer> tpCounts = new HashMap<>();
-        tpCounts.put(topic, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
@@ -1861,29 +1807,24 @@ public void testCloseWithTimeUnit() {
     @Test(expected = InvalidTopicException.class)
     public void testSubscriptionOnInvalidTopic() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Cluster cluster = metadata.fetch();
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         String invalidTopicName = "topic abc";  // Invalid topic name due to space
 
-        Set<String> invalidTopic = new HashSet<String>();
-        invalidTopic.add(invalidTopicName);
-        Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
-                                                            cluster.nodes(),
-                                                            new ArrayList<PartitionInfo>(0),
-                                                            Collections.<String>emptySet(),
-                                                            invalidTopic,
-                                                            cluster.internalTopics(),
-                                                            cluster.controller());
-        client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
-
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+                invalidTopicName, false, Collections.emptyList()));
+        MetadataResponse updateResponse = new MetadataResponse(cluster.nodes(),
+                cluster.clusterResource().clusterId(),
+                cluster.controller().id(),
+                topicMetadata);
+        client.prepareMetadataUpdate(updateResponse);
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 004445f9a4a..a0e7ab0cccb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,7 +18,6 @@
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -52,6 +51,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.Collections.emptyMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
@@ -87,18 +87,15 @@ private void setupCoordinator(int retryBackoffMs) {
 
     private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
         this.mockTime = new MockTime();
-        this.mockClient = new MockClient(mockTime);
-
         Metadata metadata = new Metadata(retryBackoffMs, 60 * 60 * 1000L, true);
+
+        this.mockClient = new MockClient(mockTime, metadata);
         this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime,
                 retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
         Metrics metrics = new Metrics();
 
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        metadata.update(cluster, Collections.emptySet(), mockTime.milliseconds());
-        this.node = cluster.nodes().get(0);
-        mockClient.setNode(node);
-
+        mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
+        this.node = metadata.fetch().nodes().get(0);
         this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
     }
@@ -244,11 +241,11 @@ public boolean matches(AbstractRequest body) {
     public void testLookupCoordinator() {
         setupCoordinator();
 
-        mockClient.setNode(null);
+        mockClient.blackout(node, 50);
         RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator();
         assertTrue("Failed future expected", noBrokersAvailableFuture.failed());
+        mockTime.sleep(50);
 
-        mockClient.setNode(node);
         RequestFuture<Void> future = coordinator.lookupCoordinator();
         assertFalse("Request not sent", future.isDone());
         assertSame("New request sent while one is in progress", future, coordinator.lookupCoordinator());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index af073f1d560..72808c87fec 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -28,7 +28,6 @@
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -40,6 +39,7 @@
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -51,6 +51,7 @@
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -114,13 +115,13 @@
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
     private MockClient client;
-    private Cluster cluster = TestUtils.clusterWith(1, new HashMap<String, Integer>() {
+    private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
         {
             put(topic1, 1);
             put(topic2, 1);
         }
     });
-    private Node node = cluster.nodes().get(0);
+    private Node node = metadataResponse.brokers().iterator().next();
     private SubscriptionState subscriptions;
     private Metadata metadata;
     private Metrics metrics;
@@ -133,8 +134,8 @@
     public void setup() {
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
-        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(metadataResponse);
         this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100,
                 requestTimeoutMs, Integer.MAX_VALUE);
         this.metrics = new Metrics(time);
@@ -142,7 +143,6 @@ public void setup() {
         this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
 
-        client.setNode(node);
         this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
     }
 
@@ -366,7 +366,7 @@ public void testJoinGroupInvalidGroupId() {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -384,7 +384,7 @@ public void testNormalJoinGroupLeader() {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -423,7 +423,7 @@ public void testPatternJoinGroupLeader() {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -443,7 +443,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -463,7 +463,7 @@ public void testMetadataRefreshDuringRebalance() {
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
         metadata.needMetadataForAllTopics(true);
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         assertEquals(singleton(topic1), subscriptions.subscription());
 
@@ -484,7 +484,7 @@ public boolean matches(AbstractRequest body) {
                 final Map<String, Integer> updatedPartitions = new HashMap<>();
                 for (String topic : updatedSubscription)
                     updatedPartitions.put(topic, 1);
-                metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds());
+                client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
                 return true;
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -526,15 +526,14 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
         final String consumerId = "consumer";
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(),
-            time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
         assertEquals(singleton(topic1), subscriptions.subscription());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // Instrument the test so that metadata will contain two topics after next refresh.
-        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -571,7 +570,7 @@ public void testWakeupDuringJoin() {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -671,7 +670,7 @@ public void testPatternJoinGroupFollower() {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -688,7 +687,7 @@ public boolean matches(AbstractRequest body) {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -847,7 +846,7 @@ public void testMetadataChangeTriggersRebalance() {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -866,7 +865,7 @@ public void testMetadataChangeTriggersRebalance() {
         assertFalse(coordinator.rejoinNeededOrPending());
 
         // a new partition is added to the topic
-        metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 2)), time.milliseconds());
 
         // we should detect the change and ask for reassignment
         assertTrue(coordinator.rejoinNeededOrPending());
@@ -886,7 +885,7 @@ public void testUpdateMetadataDuringRebalance() {
         metadata.setTopics(topics);
 
         // we only have metadata for one topic initially
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -907,7 +906,7 @@ public boolean matches(AbstractRequest body) {
                     Map<String, Integer> topicPartitionCounts = new HashMap<>();
                     topicPartitionCounts.put(topic1, 1);
                     topicPartitionCounts.put(topic2, 1);
-                    metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds());
+                    client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
                     return true;
                 }
                 return false;
@@ -948,7 +947,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
         metadata.setTopics(topics);
 
         // we only have metadata for one topic initially
-        metadata.update(TestUtils.singletonCluster(topic, 1), Collections.emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -977,33 +976,32 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
 
     @Test
     public void testRebalanceAfterTopicUnavailableWithSubscribe() {
-        unavailableTopicTest(false, false, Collections.<String>emptySet());
+        unavailableTopicTest(false, Collections.emptySet());
     }
 
     @Test
     public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
-        unavailableTopicTest(true, false, Collections.<String>emptySet());
+        unavailableTopicTest(true, Collections.emptySet());
     }
 
     @Test
-    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
-        unavailableTopicTest(true, false, Collections.singleton("notmatching"));
+    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
+        unavailableTopicTest(true, Collections.singleton("notmatching"));
     }
 
     @Test
     public void testAssignWithTopicUnavailable() {
-        unavailableTopicTest(true, false, Collections.<String>emptySet());
+        unavailableTopicTest(true, Collections.emptySet());
     }
 
-    private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
+    private void unavailableTopicTest(boolean patternSubscribe, Set<String> unavailableTopicsInLastMetadata) {
         final String consumerId = "consumer";
 
         metadata.setTopics(singletonList(topic1));
-        client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.singletonMap(topic1, Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
 
-        if (assign)
-            subscriptions.assignFromUser(singleton(t1p));
-        else if (patternSubscribe)
+        if (patternSubscribe)
             subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
         else
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1017,32 +1015,34 @@ else if (patternSubscribe)
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
-        if (!assign) {
-            assertFalse(coordinator.rejoinNeededOrPending());
-            assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
-        }
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
 
-        client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata);
+        Map<String, Errors> topicErrors = new HashMap<>();
+        for (String topic : unavailableTopicsInLastMetadata)
+            topicErrors.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                topicErrors, singletonMap(topic1, 1)));
+
         client.poll(0, time.milliseconds());
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
-        if (!assign) {
-            assertFalse(coordinator.rejoinNeededOrPending());
-            assertEquals(singleton(t1p), rebalanceListener.assigned);
-        }
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(singleton(t1p), rebalanceListener.assigned);
     }
 
     @Test
     public void testExcludeInternalTopicsConfigOption() {
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 1)));
 
-        assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+        assertFalse(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
     }
 
     @Test
@@ -1050,9 +1050,9 @@ public void testIncludeInternalTopicsConfigOption() {
         coordinator = buildCoordinator(new Metrics(), assignors, false, false, true);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 2)));
 
-        assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+        assertTrue(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 8f6328d6a18..45b420e40c7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -50,10 +50,10 @@
 
     private String topicName = "test";
     private MockTime time = new MockTime(1);
-    private MockClient client = new MockClient(time);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+    private MockClient client = new MockClient(time, metadata);
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
             client, metadata, time, 100, 1000, Integer.MAX_VALUE);
 
@@ -273,7 +273,7 @@ public void sendExpiry() {
         int requestTimeoutMs = 10;
         final AtomicBoolean isReady = new AtomicBoolean();
         final AtomicBoolean disconnected = new AtomicBoolean();
-        client = new MockClient(time) {
+        client = new MockClient(time, metadata) {
             @Override
             public boolean ready(Node node, long now) {
                 if (isReady.get())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 94d8d5b1afb..c7b0b302ede 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -113,6 +113,7 @@
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -122,7 +123,6 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 @SuppressWarnings("deprecation")
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
@@ -140,12 +140,12 @@
     private long retryBackoffMs = 100;
     private long requestTimeoutMs = 30000;
     private MockTime time = new MockTime(1);
+    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
-    private Node node = cluster.nodes().get(0);
+    private Node node;
     private Metrics metrics = new Metrics(time);
-    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
+    private FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
 
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
@@ -163,10 +163,9 @@
     private ExecutorService executorService;
 
     @Before
-    public void setup() throws Exception {
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-        client.setNode(node);
-
+    public void setup() {
+        client.updateMetadata(initialUpdateResponse);
+        node = metadata.fetch().nodes().get(0);
         records = buildRecords(1L, 3, 1);
         nextRecords = buildRecords(4L, 2, 4);
         emptyRecords = buildRecords(0L, 0, 0);
@@ -1204,7 +1203,7 @@ public void testResetOffsetsMetadataRefresh() {
         assertFalse(subscriptions.hasValidPosition(tp0));
 
         // Expect a metadata refresh
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(initialUpdateResponse);
         consumerClient.pollNoWakeup();
         assertFalse(client.hasPendingMetadataUpdates());
 
@@ -1233,7 +1232,7 @@ public void testUpdateFetchPositionDisconnect() {
         assertFalse(subscriptions.hasValidPosition(tp0));
 
         // Expect a metadata refresh
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(initialUpdateResponse);
         consumerClient.pollNoWakeup();
         assertFalse(client.hasPendingMetadataUpdates());
 
@@ -1436,7 +1435,7 @@ public void testGetAllTopics() {
 
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
 
-        assertEquals(cluster.topics().size(), allTopics.size());
+        assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
     }
 
     @Test
@@ -1445,7 +1444,7 @@ public void testGetAllTopicsDisconnect() {
         client.prepareResponse(null, true);
         client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
-        assertEquals(cluster.topics().size(), allTopics.size());
+        assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
     }
 
     @Test(expected = TimeoutException.class)
@@ -1535,7 +1534,7 @@ public void testGetTopicMetadataOfflinePartitions() {
         Assert.assertNotNull(topicMetadata);
         Assert.assertNotNull(topicMetadata.get(topicName));
         //noinspection ConstantConditions
-        Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
+        Assert.assertEquals((int) metadata.fetch().partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
     }
 
     /*
@@ -1717,8 +1716,7 @@ public void testFetchResponseMetrics() {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put(topic1, 1);
         partitionCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.clusterWith(1, partitionCounts);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts));
 
         subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
 
@@ -1950,21 +1948,24 @@ public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially
         client.reset();
 
         // Metadata initially has one topic
-        Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(3, singletonMap(topicName, 2));
+        client.updateMetadata(initialMetadata);
 
         // The first metadata refresh should contain one topic
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false);
-        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1));
+        client.prepareMetadataUpdate(initialMetadata, false);
+        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L),
+                metadata.fetch().leaderFor(tp0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L),
+                metadata.fetch().leaderFor(tp1));
 
         // Second metadata refresh should contain two topics
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
         partitionNumByTopic.put(anotherTopic, 1);
-        Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
-        client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false);
-        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0));
+        MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(3, partitionNumByTopic);
+        client.prepareMetadataUpdate(updatedMetadata, false);
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L),
+                metadata.fetch().leaderFor(t2p0));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
@@ -2518,9 +2519,11 @@ public void testFetcherConcurrency() throws Exception {
         Set<TopicPartition> topicPartitions = new HashSet<>();
         for (int i = 0; i < numPartitions; i++)
             topicPartitions.add(new TopicPartition(topicName, i));
-        cluster = TestUtils.singletonCluster(topicName, numPartitions);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-        client.setNode(node);
+
+        MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+                singletonMap(topicName, numPartitions));
+        client.updateMetadata(initialMetadataResponse);
+        node = metadata.fetch().nodes().get(0);
         fetchSize = 10000;
 
         Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(
@@ -2735,23 +2738,28 @@ private void testGetOffsetsForTimesWithError(Errors errorForP0,
         String topicName2 = "topic2";
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
-        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
-                        Collections.<String>emptySet(),
-                        time.milliseconds());
+        metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
+                ClientDnsLookup.DEFAULT), time.milliseconds());
 
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
         partitionNumByTopic.put(topicName2, 1);
-        cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+        MetadataResponse updateMetadataResponse = TestUtils.metadataUpdateWith(2, partitionNumByTopic);
+        Cluster updatedCluster = updateMetadataResponse.cluster();
+
         // The metadata refresh should contain all the topics.
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
+        client.prepareMetadataUpdate(updateMetadataResponse, true);
 
         // First try should fail due to metadata error.
-        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0),
+                updatedCluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1),
+                updatedCluster.leaderFor(tp1));
         // Second try should succeed.
-        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0),
+                updatedCluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1),
+                updatedCluster.leaderFor(tp1));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(t2p0, 0L);
@@ -2777,15 +2785,16 @@ private void testGetOffsetsForTimesWithError(Errors errorForP0,
     private void testGetOffsetsForTimesWithUnknownOffset() {
         client.reset();
         // Ensure metadata has both partition.
-        Cluster cluster = TestUtils.clusterWith(1, topicName, 1);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+        client.updateMetadata(initialMetadataUpdate);
 
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
         partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE,
                 ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
                 Optional.empty()));
 
-        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), cluster.leaderFor(tp0));
+        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+                metadata.fetch().leaderFor(tp0));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, 0L);
@@ -2851,20 +2860,20 @@ private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, l
     private MetadataResponse newMetadataResponse(String topic, Errors error) {
         List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
         if (error == Errors.NONE) {
-            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
-                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
-                        Errors.NONE,
-                        partitionInfo.partition(),
-                        partitionInfo.leader(),
-                        Optional.empty(),
-                        Arrays.asList(partitionInfo.replicas()),
-                        Arrays.asList(partitionInfo.inSyncReplicas()),
-                        Arrays.asList(partitionInfo.offlineReplicas())));
-            }
+            Optional<MetadataResponse.TopicMetadata> foundMetadata = initialUpdateResponse.topicMetadata()
+                    .stream()
+                    .filter(topicMetadata -> topicMetadata.topic().equals(topic))
+                    .findFirst();
+            foundMetadata.ifPresent(topicMetadata -> {
+                partitionsMetadata.addAll(topicMetadata.partitionMetadata());
+            });
         }
 
-        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
-        return new MetadataResponse(cluster.nodes(), null, MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
+        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false,
+                partitionsMetadata);
+        List<Node> brokers = new ArrayList<>(initialUpdateResponse.brokers());
+        return new MetadataResponse(brokers, initialUpdateResponse.clusterId(),
+                initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata));
     }
 
     private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index b5d7709de01..b544a654de4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.producer;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -37,6 +34,8 @@
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
@@ -52,10 +51,12 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -66,6 +67,8 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertArrayEquals;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -209,20 +212,16 @@ public void testPartitionerClose() {
 
     @Test
     public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
-        Map<String, Object> configs = new HashMap();
+        Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
         configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+        client.updateMetadata(initialUpdateResponse);
 
         final Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
                 new StringSerializer(), metadata, client, null, time);
@@ -443,7 +442,9 @@ public void testTopicRefreshInMetadata() throws InterruptedException {
                 for (int i = 0; i < 10; i++) {
                     while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
                         Thread.yield();
-                    metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+                    MetadataResponse updateResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                            singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
+                    metadata.update(updateResponse, time.milliseconds());
                     time.sleep(60 * 1000L);
                 }
             });
@@ -479,14 +480,9 @@ public void testHeaders() {
         Serializer<String> valueSerializer = mock(serializerClassToMock);
 
         String topic = "topic";
-        final Cluster cluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
-                Collections.emptySet(),
-                Collections.emptySet());
         Metadata metadata = new Metadata(0, 90000, true);
-        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+        metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
 
         KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
                 null, null, Time.SYSTEM);
@@ -553,13 +549,8 @@ public void testInterceptorPartitionSetOnTooLargeRecord() {
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
 
         Metadata metadata = new Metadata(0, 90000, true);
-        final Cluster cluster = new Cluster(
-            "dummy",
-            Collections.singletonList(new Node(0, "host1", 1000)),
-            Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
-            Collections.emptySet(),
-            Collections.emptySet());
-        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+        metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
 
         @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
                 ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
@@ -595,15 +586,12 @@ public void testInitTransactionTimeout() {
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
-        Time time = Time.SYSTEM;
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
                 new StringSerializer(), metadata, client, null, time)) {
@@ -620,14 +608,11 @@ public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
@@ -651,14 +636,11 @@ public void testSendToInvalidTopic() throws Exception {
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
@@ -666,20 +648,19 @@ public void testSendToInvalidTopic() throws Exception {
         String invalidTopicName = "topic abc"; // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
 
-        Set<String> invalidTopic = new HashSet<>();
-        invalidTopic.add(invalidTopicName);
-        Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
-                cluster.nodes(),
-                new ArrayList<>(0),
-                Collections.emptySet(),
-                invalidTopic,
-                cluster.internalTopics(),
-                cluster.controller());
-        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.emptySet());
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+                invalidTopicName, false, Collections.emptyList()));
+        MetadataResponse updateResponse = new MetadataResponse(
+                new ArrayList<>(initialUpdateResponse.brokers()),
+                initialUpdateResponse.clusterId(),
+                initialUpdateResponse.controller().id(),
+                topicMetadata);
+        client.prepareMetadataUpdate(updateResponse);
 
         Future<RecordMetadata> future = producer.send(record);
 
-        assertEquals("Cluster has incorrect invalid topic list", metaDataUpdateResponseCluster.invalidTopics(),
+        assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
                 metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
 
@@ -697,12 +678,10 @@ public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException
         // return with a KafkaException.
         String topicName = "test";
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(initialUpdateResponse, time.milliseconds());
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8a8ddd35836..606637e4898 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -73,6 +73,7 @@
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -115,11 +116,10 @@
     private TopicPartition tp0 = new TopicPartition("test", 0);
     private TopicPartition tp1 = new TopicPartition("test", 1);
     private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+    private MockClient client = new MockClient(time, metadata);
     private ApiVersions apiVersions = new ApiVersions();
-    private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
@@ -128,7 +128,6 @@
 
     @Before
     public void setup() {
-        client.setNode(cluster.nodes().get(0));
         setupWithTransactionState(null);
     }
 
@@ -395,8 +394,8 @@ public void testSendInOrder() throws Exception {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
-            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
-            metadata.update(cluster1, Collections.emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
+            client.prepareMetadataUpdate(metadataUpdate1);
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
@@ -416,8 +415,8 @@ public void testSendInOrder() throws Exception {
             accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
 
             // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
-            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
-            metadata.update(cluster2, Collections.emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
+            client.prepareMetadataUpdate(metadataUpdate2);
             // Sender should not send the second message to node 0.
             assertEquals(1, sender.inFlightBatches(tp2).size());
             sender.run(time.milliseconds());  // receive the response for the previous send, and send the new batch
@@ -458,9 +457,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
         // Advance the clock to expire the first batch.
         time.sleep(10000);
 
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         Map<Integer, List<ProducerBatch>> drainedBatches =
-            accumulator.drain(cluster, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
+            accumulator.drain(metadata.fetch(), Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
         sender.addToInflightBatches(drainedBatches);
 
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
@@ -484,12 +483,12 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
     @Test
     public void testMetadataTopicExpiry() throws Exception {
         long offset = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
 
         Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
@@ -501,12 +500,12 @@ public void testMetadataTopicExpiry() throws Exception {
 
         assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
         time.sleep(Metadata.TOPIC_EXPIRY_MS);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
         assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
         future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
@@ -522,7 +521,6 @@ public void testInitProducerIdRequest() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
         assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
@@ -534,7 +532,6 @@ public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Ex
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
         assertFalse(transactionManager.hasProducerId());
         assertTrue(transactionManager.hasError());
@@ -988,7 +985,7 @@ public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(10000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1024,7 +1021,7 @@ public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatch
         sender.run(time.milliseconds());  // receive first response
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         // We add 600 millis to expire the first batch but not the second.
         // Note deliveryTimeoutMs is 1500.
         time.sleep(600L);
@@ -1088,7 +1085,7 @@ public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws E
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
         sender.run(time.milliseconds());  // receive first response
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(1000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1136,7 +1133,7 @@ public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Ex
         sender.run(time.milliseconds());  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(15000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1160,7 +1157,6 @@ public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exc
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1206,7 +1202,6 @@ public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceO
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1576,7 +1571,6 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1604,7 +1598,6 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1647,7 +1640,6 @@ public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1673,7 +1665,6 @@ public void testUnsupportedVersionInProduceRequest() throws Exception {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1700,7 +1691,6 @@ public void testSequenceNumberIncrement() throws InterruptedException {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1745,7 +1735,6 @@ public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1784,7 +1773,6 @@ public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedExceptio
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1853,8 +1841,8 @@ private void testSplitBatchAndSend(TransactionManager txnManager,
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
-            Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
-            metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
+            client.prepareMetadataUpdate(metadataUpdate1);
             // Send the first message.
             Future<RecordMetadata> f1 =
                     accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
@@ -2068,7 +2056,7 @@ public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exceptio
 
     @Test
     public void testResetNextBatchExpiry() throws Exception {
-        client = spy(new MockClient(time));
+        client = spy(new MockClient(time, metadata));
 
         setupWithTransactionState(null);
 
@@ -2184,25 +2172,20 @@ private void setupWithTransactionState(TransactionManager transactionManager) {
     }
 
     private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+        long deliveryTimeoutMs = 1500L;
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
-        Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put("client-id", CLIENT_ID);
-        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
         this.metrics = new Metrics(metricConfig, time);
         BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
-        setupWithTransactionState(transactionManager, guaranteeOrder, metricTags, pool);
-    }
 
-    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, BufferPool pool) {
-        long deliveryTimeoutMs = 1500L;
-        String metricGrpName = "producer-metrics";
         this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
-            deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
+                deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
-            Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds());
+                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
     }
 
     private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
@@ -2244,7 +2227,7 @@ private void doInitTransactions(TransactionManager transactionManager, ProducerI
     }
 
     private void prepareFindCoordinatorResponse(Errors error) {
-        client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0)));
+        client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
     }
 
     private void prepareInitPidResponse(Errors error, long pid, short epoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index cf730b9cb12..72c0a0b55a0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -102,11 +102,10 @@
     private TopicPartition tp0 = new TopicPartition(topic, 0);
     private TopicPartition tp1 = new TopicPartition(topic, 1);
     private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
-
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+    private MockClient client = new MockClient(time, metadata);
+
     private ApiVersions apiVersions = new ApiVersions();
-    private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
     private TransactionManager transactionManager = null;
@@ -133,8 +132,7 @@ public void setup() {
                 new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
-        client.setNode(brokerNode);
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
     }
 
     @Test
@@ -2042,7 +2040,7 @@ public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedExce
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2098,7 +2096,7 @@ public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws Interru
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2155,7 +2153,7 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException, Execution
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2226,7 +2224,7 @@ public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws Interru
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index afb342bb151..50d1eebbd72 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,9 +21,12 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -42,15 +45,16 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
-import java.util.concurrent.ExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -70,9 +74,6 @@
     public static final String DIGITS = "0123456789";
     public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
 
-    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
-    public static final Set<String> INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME);
-
     /* A consistent random number generator to make tests repeatable */
     public static final Random SEEDED_RANDOM = new Random(192348092834L);
     public static final Random RANDOM = new Random();
@@ -105,7 +106,52 @@ public static Cluster clusterWith(final int nodes, final Map<String, Integer> to
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
         }
-        return new Cluster("kafka-cluster", asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
+        return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Topic.INTERNAL_TOPICS);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final int numNodes,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        return metadataUpdateWith("kafka-cluster", numNodes, topicPartitionCounts);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final String clusterId,
+                                                      final int numNodes,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final String clusterId,
+                                                      final int numNodes,
+                                                      final Map<String, Errors> topicErrors,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        final List<Node> nodes = new ArrayList<>(numNodes);
+        for (int i = 0; i < numNodes; i++)
+            nodes.add(new Node(i, "localhost", 1969 + i));
+
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        for (Map.Entry<String, Integer> topicPartitionCountEntry : topicPartitionCounts.entrySet()) {
+            String topic = topicPartitionCountEntry.getKey();
+            int numPartitions = topicPartitionCountEntry.getValue();
+
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                Node leader = nodes.get(i % nodes.size());
+                List<Node> replicas = Collections.singletonList(leader);
+                partitionMetadata.add(new MetadataResponse.PartitionMetadata(
+                        Errors.NONE, i, leader, Optional.empty(), replicas, replicas, replicas));
+            }
+
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
+                    Topic.isInternal(topic), partitionMetadata));
+        }
+
+        for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
+            String topic = topicErrorEntry.getKey();
+            topicMetadata.add(new MetadataResponse.TopicMetadata(topicErrorEntry.getValue(), topic,
+                    Topic.isInternal(topic), Collections.emptyList()));
+        }
+
+        return new MetadataResponse(nodes, clusterId, 0, topicMetadata);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, final int partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index cb62fb6c99d..ed48d572ec0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,7 +23,6 @@
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -99,7 +98,7 @@ public WorkerGroupMember(DistributedConfig config,
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+            this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
             NetworkClient netClient = new NetworkClient(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index ede6c71611d..7ccb68c08fc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -74,8 +73,7 @@
     private long retryBackoffMs = 100;
     private MockTime time;
     private MockClient client;
-    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
-    private Node node = cluster.nodes().get(0);
+    private Node node;
     private Metadata metadata;
     private Metrics metrics;
     private ConsumerNetworkClient consumerClient;
@@ -92,16 +90,15 @@ public void setup() {
         LogContext loggerFactory = new LogContext();
 
         this.time = new MockTime();
-        this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
-        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+        this.node = metadata.fetch().nodes().get(0);
         this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, 100, 1000, heartbeatIntervalMs);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
 
-        client.setNode(node);
-
         this.coordinator = new WorkerCoordinator(
                 loggerFactory,
                 consumerClient,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 5b1e1553559..26c8e71f987 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.MockTime;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -49,8 +50,7 @@
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
@@ -63,8 +63,7 @@ public void returnNullWithApiVersionMismatch() {
     public void returnNullWithClusterAuthorizationFailure() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.nodes().iterator().next());
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5e0f7c87f59..ae2ffb16b6e 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -439,8 +439,7 @@ object AdminClient {
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
-    val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
-    metadata.update(bootstrapCluster, Collections.emptySet(), 0)
+    metadata.bootstrap(brokerAddresses, time.milliseconds())
 
     val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
 
@@ -482,6 +481,6 @@ object AdminClient {
       requestTimeoutMs,
       retryBackoffMs,
       highLevelClient,
-      bootstrapCluster.nodes.asScala.toList)
+      metadata.fetch.nodes.asScala.toList)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index b7c037ef86b..8c7e0dc390e 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -16,8 +16,12 @@
   */
 package kafka.server.epoch.util
 
+import java.util
+import java.util.Collections
+
 import kafka.cluster.BrokerEndPoint
 import kafka.server.BlockingSend
+import org.apache.kafka.clients.MockClient.MockMetadataUpdater
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.Records
@@ -35,7 +39,11 @@ import org.apache.kafka.common.{Node, TopicPartition}
   * setOffsetsForNextResponse
   */
 class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
-  private val client = new MockClient(new SystemTime)
+  private val client = new MockClient(new SystemTime, new MockMetadataUpdater {
+    override def fetchNodes(): util.List[Node] = Collections.emptyList()
+    override def isUpdateNeeded: Boolean = false
+    override def update(time: Time, update: MockClient.MetadataUpdate): Unit = {}
+  })
   var fetchCount = 0
   var epochFetchCount = 0
   var lastUsedOffsetForLeaderEpochVersion = -1
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index bb199b73d57..cc9a2ffa495 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -604,9 +605,11 @@ private void cleanGlobal(final boolean withIntermediateTopics,
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
         // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
         if (intermediateUserTopic != null) {
-            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+                    Topic.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
         } else {
-            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+                    Topic.GROUP_METADATA_TOPIC_NAME);
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Clean up internal metadata usage for consistency and extensibility
> ------------------------------------------------------------------
>
>                 Key: KAFKA-7567
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7567
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>             Fix For: 2.2.0
>
>
> This refactor has two objectives to improve metadata handling logic and testing:
> 1. We want to reduce dependence on the public object `Cluster` for internal metadata propagation since it is not easy to evolve. As an example, we need to propagate leader epochs from the metadata response to `Metadata`, but it is not straightforward to do this without exposing it in `PartitionInfo` since that is what `Cluster` uses internally. By doing this change, we are able to remove some redundant `Cluster` building logic. 
> 2. We want to make the metadata handling in `MockClient` simpler and more consistent. Currently we have mix of metadata update mechanisms which are internally inconsistent with each other and also do not match the implementation in `NetworkClient`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)