You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/10/30 20:20:25 UTC

[kafka] branch trunk updated: KAFKA-7567; Clean up internal metadata usage for consistency and extensibility (#5813)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d71cb54  KAFKA-7567; Clean up internal metadata usage for consistency and extensibility (#5813)
d71cb54 is described below

commit d71cb54672e63d2f0f4b999668336642a9a63a1d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 30 13:20:13 2018 -0700

    KAFKA-7567; Clean up internal metadata usage for consistency and extensibility (#5813)
    
    This patch makes two improvements to internal metadata handling logic and testing:
    
    1. It reduce dependence on the public object `Cluster` for internal metadata propagation since it is not easy to evolve. As an example, we need to propagate leader epochs from the metadata response to `Metadata`, but it is not straightforward to do this without exposing it in `PartitionInfo` since that is what `Cluster` uses internally. By doing this change, we are able to remove some redundant `Cluster` building logic.
    2. We want to make the metadata handling in `MockClient` simpler and more consistent. Currently we have mix of metadata update mechanisms which are internally inconsistent with each other and do not match the implementation in `NetworkClient`.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../java/org/apache/kafka/clients/ClientUtils.java |   9 +-
 .../java/org/apache/kafka/clients/Metadata.java    |  90 +++---
 .../org/apache/kafka/clients/NetworkClient.java    |   7 +-
 .../kafka/clients/admin/KafkaAdminClient.java      |  37 ++-
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../kafka/clients/producer/KafkaProducer.java      |   2 +-
 .../main/java/org/apache/kafka/common/Cluster.java |  12 +-
 .../org/apache/kafka/common/PartitionInfo.java     |  10 +-
 .../org/apache/kafka/common/internals/Topic.java   |   2 +-
 .../kafka/common/requests/MetadataResponse.java    |   7 +
 .../org/apache/kafka/clients/ClientUtilsTest.java  |   4 +-
 .../org/apache/kafka/clients/MetadataTest.java     | 159 +++++------
 .../java/org/apache/kafka/clients/MockClient.java  | 149 ++++++----
 .../apache/kafka/clients/NetworkClientTest.java    |  10 +-
 .../clients/admin/AdminClientUnitTestEnv.java      |  55 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 124 +++------
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 303 +++++++++------------
 .../internals/AbstractCoordinatorTest.java         |  17 +-
 .../internals/ConsumerCoordinatorTest.java         |  90 +++---
 .../internals/ConsumerNetworkClientTest.java       |   4 +-
 .../clients/consumer/internals/FetcherTest.java    | 109 ++++----
 .../kafka/clients/producer/KafkaProducerTest.java  |  93 +++----
 .../clients/producer/internals/SenderTest.java     |  69 ++---
 .../producer/internals/TransactionManagerTest.java |  16 +-
 .../test/java/org/apache/kafka/test/TestUtils.java |  60 +++-
 .../runtime/distributed/WorkerGroupMember.java     |   3 +-
 .../runtime/distributed/WorkerCoordinatorTest.java |  11 +-
 .../apache/kafka/connect/util/TopicAdminTest.java  |   7 +-
 core/src/main/scala/kafka/admin/AdminClient.scala  |   5 +-
 .../util/ReplicaFetcherMockBlockingSend.scala      |  10 +-
 .../integration/AbstractResetIntegrationTest.java  |   7 +-
 31 files changed, 726 insertions(+), 757 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index fe83c5c..4d93324 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -45,9 +45,12 @@ public final class ClientUtils {
     private ClientUtils() {
     }
 
-    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookupConfig) {
+        return parseAndValidateAddresses(urls, ClientDnsLookup.forConfig(clientDnsLookupConfig));
+    }
+
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) {
         List<InetSocketAddress> addresses = new ArrayList<>();
-        ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
         for (String url : urls) {
             if (url != null && !url.isEmpty()) {
                 try {
@@ -56,7 +59,7 @@ public final class ClientUtils {
                     if (host == null || port == null)
                         throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
 
-                    if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+                    if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
                         InetAddress[] inetAddresses = InetAddress.getAllByName(host);
                         for (InetAddress inetAddress : inetAddresses) {
                             String resolvedCanonicalName = inetAddress.getCanonicalHostName();
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0abb5c4..1028de7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -18,15 +18,15 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,6 +64,7 @@ public class Metadata implements Closeable {
     private long lastSuccessfulRefreshMs;
     private AuthenticationException authenticationException;
     private Cluster cluster;
+    private Set<String> unavailableTopics = Collections.emptySet();
     private boolean needUpdate;
     /* Topics with expiry time */
     private final Map<String, Long> topics;
@@ -74,7 +75,9 @@ public class Metadata implements Closeable {
     private final boolean topicExpiryEnabled;
     private boolean isClosed;
 
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
+    public Metadata(long refreshBackoffMs,
+                    long metadataExpireMs,
+                    boolean allowAutoTopicCreation) {
         this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
     }
 
@@ -88,8 +91,11 @@ public class Metadata implements Closeable {
      * @param topicExpiryEnabled If true, enable expiry of unused topics
      * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
      */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
-                    boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
+    public Metadata(long refreshBackoffMs,
+                    long metadataExpireMs,
+                    boolean allowAutoTopicCreation,
+                    boolean topicExpiryEnabled,
+                    ClusterResourceListeners clusterResourceListeners) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.allowAutoTopicCreation = allowAutoTopicCreation;
@@ -231,17 +237,23 @@ public class Metadata implements Closeable {
         return this.topics.containsKey(topic);
     }
 
+    public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
+        this.needUpdate = true;
+        this.lastRefreshMs = now;
+        this.lastSuccessfulRefreshMs = now;
+        this.version += 1;
+        this.cluster = Cluster.bootstrap(addresses);
+    }
+
     /**
      * Updates the cluster metadata. If topic expiry is enabled, expiry time
      * is set for topics if required and expired topics are removed from the metadata.
      *
-     * @param newCluster the cluster containing metadata for topics with valid metadata
-     * @param unavailableTopics topics which are non-existent or have one or more partitions whose
-     *        leader is not known
+     * @param metadataResponse metadata response received from the broker
      * @param now current time in milliseconds
      */
-    public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
-        Objects.requireNonNull(newCluster, "cluster should not be null");
+    public synchronized void update(MetadataResponse metadataResponse, long now) {
+        Objects.requireNonNull(metadataResponse, "Metadata response cannot be null");
         if (isClosed())
             throw new IllegalStateException("Update requested after metadata close");
 
@@ -264,32 +276,34 @@ public class Metadata implements Closeable {
             }
         }
 
-        for (Listener listener: listeners)
-            listener.onMetadataUpdate(newCluster, unavailableTopics);
-
         String previousClusterId = cluster.clusterResource().clusterId();
 
+        this.cluster = metadataResponse.cluster();
+        this.unavailableTopics = metadataResponse.unavailableTopics();
+
+        fireListeners(cluster, unavailableTopics);
+
         if (this.needMetadataForAllTopics) {
             // the listener may change the interested topics, which could cause another metadata refresh.
             // If we have already fetched all topics, however, another fetch should be unnecessary.
             this.needUpdate = false;
-            this.cluster = getClusterForCurrentTopics(newCluster);
-        } else {
-            this.cluster = newCluster;
+            this.cluster = metadataResponse.cluster(topics.keySet());
         }
 
-        // The bootstrap cluster is guaranteed not to have any useful information
-        if (!newCluster.isBootstrapConfigured()) {
-            String newClusterId = newCluster.clusterResource().clusterId();
-            if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
-                log.info("Cluster ID: {}", newClusterId);
-            clusterResourceListeners.onUpdate(newCluster.clusterResource());
-        }
+        String newClusterId = cluster.clusterResource().clusterId();
+        if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
+            log.info("Cluster ID: {}", newClusterId);
+        clusterResourceListeners.onUpdate(cluster.clusterResource());
 
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
     }
 
+    private void fireListeners(Cluster newCluster, Set<String> unavailableTopics) {
+        for (Listener listener: listeners)
+            listener.onMetadataUpdate(newCluster, unavailableTopics);
+    }
+
     /**
      * Record an attempt to update the metadata that failed. We need to keep track of this
      * to avoid retrying immediately.
@@ -390,32 +404,4 @@ public class Metadata implements Closeable {
         requestUpdate();
     }
 
-    private Cluster getClusterForCurrentTopics(Cluster cluster) {
-        Set<String> unauthorizedTopics = new HashSet<>();
-        Set<String> invalidTopics = new HashSet<>();
-        Collection<PartitionInfo> partitionInfos = new ArrayList<>();
-        List<Node> nodes = Collections.emptyList();
-        Set<String> internalTopics = Collections.emptySet();
-        Node controller = null;
-        String clusterId = null;
-        if (cluster != null) {
-            clusterId = cluster.clusterResource().clusterId();
-            internalTopics = cluster.internalTopics();
-            unauthorizedTopics.addAll(cluster.unauthorizedTopics());
-            unauthorizedTopics.retainAll(this.topics.keySet());
-
-            invalidTopics.addAll(cluster.invalidTopics());
-            invalidTopics.addAll(this.cluster.invalidTopics());
-
-            for (String topic : this.topics.keySet()) {
-                List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
-                if (!partitionInfoList.isEmpty()) {
-                    partitionInfos.addAll(partitionInfoList);
-                }
-            }
-            nodes = cluster.nodes();
-            controller  = cluster.controller();
-        }
-        return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c6f0c0b..902ef1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -962,7 +962,6 @@ public class NetworkClient implements KafkaClient {
         @Override
         public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
             this.metadataFetchInProgress = false;
-            Cluster cluster = response.cluster();
 
             // If any partition has leader with missing listeners, log a few for diagnosing broker configuration
             // issues. This could be a transient issue if listeners were added dynamically to brokers.
@@ -984,11 +983,11 @@ public class NetworkClient implements KafkaClient {
 
             // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
             // created which means we will get errors and no nodes until it exists
-            if (cluster.nodes().size() > 0) {
-                this.metadata.update(cluster, response.unavailableTopics(), now);
-            } else {
+            if (response.brokers().isEmpty()) {
                 log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now, null);
+            } else {
+                this.metadata.update(response, now);
             }
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 33a4786..39817df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -339,6 +339,10 @@ public class KafkaAdminClient extends AdminClient {
             AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+                    config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
+            metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class,
                 Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
@@ -375,25 +379,25 @@ public class KafkaAdminClient extends AdminClient {
             closeQuietly(networkClient, "NetworkClient");
             closeQuietly(selector, "Selector");
             closeQuietly(channelBuilder, "ChannelBuilder");
-            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+            throw new KafkaException("Failed to create new KafkaAdminClient", exc);
         }
     }
 
-    static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Time time) {
+    static KafkaAdminClient createInternal(AdminClientConfig config,
+                                           AdminMetadataManager metadataManager,
+                                           KafkaClient client,
+                                           Time time) {
         Metrics metrics = null;
         String clientId = generateClientId(config);
 
         try {
             metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
             LogContext logContext = createLogContext(clientId);
-            AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
-                config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
             return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
                 client, null, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
-            throw new KafkaException("Failed create new KafkaAdminClient", exc);
+            throw new KafkaException("Failed to create new KafkaAdminClient", exc);
         }
     }
 
@@ -414,10 +418,6 @@ public class KafkaAdminClient extends AdminClient {
         this.log = logContext.logger(KafkaAdminClient.class);
         this.time = time;
         this.metadataManager = metadataManager;
-        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
-            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
-            config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
-        metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
@@ -1399,12 +1399,12 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse response = (MetadataResponse) abstractResponse;
-                Cluster cluster = response.cluster();
                 Map<String, TopicListing> topicListing = new HashMap<>();
-                for (String topicName : cluster.topics()) {
-                    boolean internal = cluster.internalTopics().contains(topicName);
-                    if (!internal || options.shouldListInternal())
-                        topicListing.put(topicName, new TopicListing(topicName, internal));
+                for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
+                    String topicName = topicMetadata.topic();
+                    boolean isInternal = topicMetadata.isInternal();
+                    if (!topicMetadata.isInternal() || options.shouldListInternal())
+                        topicListing.put(topicName, new TopicListing(topicName, isInternal));
                 }
                 topicListingFuture.complete(topicListing);
             }
@@ -2551,12 +2551,11 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
-                Cluster cluster = metadataResponse.cluster();
-
-                if (cluster.nodes().isEmpty())
+                Collection<Node> nodes = metadataResponse.brokers();
+                if (nodes.isEmpty())
                     throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
 
-                HashSet<Node> allNodes = new HashSet<>(cluster.nodes());
+                HashSet<Node> allNodes = new HashSet<>(nodes);
                 final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all);
 
                 for (final Node node : allNodes) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 714cd94..065e663 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -712,7 +712,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+            this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e916771..c9052e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -416,7 +416,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             } else {
                 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                     true, true, clusterResourceListeners);
-                this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
+                this.metadata.bootstrap(addresses, time.milliseconds());
             }
             this.errors = this.metrics.sensor("errors");
             this.sender = newSender(logContext, kafkaClient, this.metadata);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 24a18db..8a773bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -79,12 +79,12 @@ public final class Cluster {
      * @param partitions Information about a subset of the topic-partitions this cluster hosts
      */
     public Cluster(String clusterId,
-        Collection<Node> nodes,
-        Collection<PartitionInfo> partitions,
-        Set<String> unauthorizedTopics,
-        Set<String> invalidTopics,
-        Set<String> internalTopics,
-        Node controller) {
+                   Collection<Node> nodes,
+                   Collection<PartitionInfo> partitions,
+                   Set<String> unauthorizedTopics,
+                   Set<String> invalidTopics,
+                   Set<String> internalTopics,
+                   Node controller) {
         this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 44cd4f4..7ed54f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -103,10 +103,12 @@ public class PartitionInfo {
     /* Extract the node ids from each item in the array and format for display */
     private String formatNodeIds(Node[] nodes) {
         StringBuilder b = new StringBuilder("[");
-        for (int i = 0; i < nodes.length; i++) {
-            b.append(nodes[i].idString());
-            if (i < nodes.length - 1)
-                b.append(',');
+        if (nodes != null) {
+            for (int i = 0; i < nodes.length; i++) {
+                b.append(nodes[i].idString());
+                if (i < nodes.length - 1)
+                    b.append(',');
+            }
         }
         b.append("]");
         return b.toString();
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index a5ef335..619477d 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,7 +28,7 @@ public class Topic {
     public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
     public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
 
-    private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
+    public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
             Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
 
     private static final int MAX_NAME_LENGTH = 249;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c78066f..fc67571 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -368,9 +368,16 @@ public class MetadataResponse extends AbstractResponse {
      * @return the cluster snapshot
      */
     public Cluster cluster() {
+        return cluster(null);
+    }
+
+    public Cluster cluster(Set<String> topicsToRetain) {
         Set<String> internalTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
         for (TopicMetadata metadata : topicMetadata) {
+            if (topicsToRetain != null && !topicsToRetain.contains(metadata.topic))
+                continue;
+
             if (metadata.error == Errors.NONE) {
                 if (metadata.isInternal)
                     internalTopics.add(metadata.topic);
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 227afea..afe5a5d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -112,11 +112,11 @@ public class ClientUtilsTest {
     }
 
     private List<InetSocketAddress> checkWithoutLookup(String... url) {
-        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
+        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
     }
 
     private List<InetSocketAddress> checkWithLookup(List<String> url) {
-        return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
+        return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY);
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 969921e..b8c9d64 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,25 +16,26 @@
  */
 package org.apache.kafka.clients;
 
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.test.MockClusterResourceListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
 
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,10 +55,18 @@ public class MetadataTest {
         assertNull("Exception in background thread : " + backgroundError.get(), backgroundError.get());
     }
 
+    private static MetadataResponse emptyMetadataResponse() {
+        return new MetadataResponse(
+                Collections.emptyList(),
+                null,
+                -1,
+                Collections.emptyList());
+    }
+    
     @Test
     public void testMetadata() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -72,7 +81,8 @@ public class MetadataTest {
         // This simulates the metadata update sequence in KafkaProducer
         while (t1.isAlive() || t2.isAlive()) {
             if (metadata.timeToNextUpdate(time) == 0) {
-                metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time);
+                MetadataResponse response = TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
+                metadata.update(response, time);
                 time += refreshBackoffMs;
             }
             Thread.sleep(1);
@@ -87,7 +97,7 @@ public class MetadataTest {
     @Test
     public void testMetadataAwaitAfterClose() throws InterruptedException {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         metadata.requestUpdate();
         assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -105,7 +115,7 @@ public class MetadataTest {
     @Test(expected = IllegalStateException.class)
     public void testMetadataUpdateAfterClose() {
         metadata.close();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+        metadata.update(emptyMetadataResponse(), 1000);
     }
 
     private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {
@@ -126,7 +136,7 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // lastSuccessfulRefreshMs updated to now.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
 
         // The last update was successful so the remaining time to expire the current metadata should be returned.
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
@@ -137,7 +147,7 @@ public class MetadataTest {
         assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
 
         // Reset needUpdate to false.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
 
         // Both metadataExpireMs and refreshBackoffMs elapsed.
@@ -181,13 +191,13 @@ public class MetadataTest {
         long now = 10000;
 
         // New topic added to fetch set and update requested. It should allow immediate update.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.add("new-topic");
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // Even though setTopics called, immediate update isn't necessary if the new topic set isn't
         // containing a new topic,
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.setTopics(metadata.topics());
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
 
@@ -196,12 +206,12 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // If metadata requested for all topics it should allow immediate update.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(0, metadata.timeToNextUpdate(now));
 
         // However if metadata is already capable to serve all topics it shouldn't override backoff.
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+        metadata.update(emptyMetadataResponse(), now);
         metadata.needMetadataForAllTopics(true);
         assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
     }
@@ -216,7 +226,7 @@ public class MetadataTest {
     @Test
     public void testMetadataUpdateWaitTime() throws Exception {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
         // first try with a max wait time of 0 and ensure that this returns back without waiting forever
         try {
@@ -238,7 +248,7 @@ public class MetadataTest {
     @Test
     public void testFailedUpdate() {
         long time = 100;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
 
         assertEquals(100, metadata.timeToNextUpdate(1000));
         metadata.failedUpdate(1100, null);
@@ -247,26 +257,24 @@ public class MetadataTest {
         assertEquals(100, metadata.lastSuccessfulUpdate());
 
         metadata.needMetadataForAllTopics(true);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertEquals(100, metadata.timeToNextUpdate(1000));
     }
 
     @Test
     public void testUpdateWithNeedMetadataForAllTopics() {
         long time = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         metadata.needMetadataForAllTopics(true);
 
         final List<String> expectedTopics = Collections.singletonList("topic");
         metadata.setTopics(expectedTopics);
-        metadata.update(new Cluster(null,
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertArrayEquals("Metadata got updated with wrong set of topics.",
             expectedTopics.toArray(), metadata.topics().toArray());
@@ -283,20 +291,15 @@ public class MetadataTest {
         metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners);
 
         String hostName = "www.example.com";
-        Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
-        metadata.update(cluster, Collections.<String>emptySet(), time);
+        metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
         assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
                 MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
 
-        metadata.update(new Cluster(
-                        "dummy",
-                        Arrays.asList(new Node(0, "host1", 1000)),
-                        Arrays.asList(
-                                new PartitionInfo("topic", 0, null, null, null),
-                                new PartitionInfo("topic1", 0, null, null, null)),
-                        Collections.<String>emptySet(),
-                        Collections.<String>emptySet()),
-                Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
                 "dummy", mockClusterListener.clusterResource().clusterId());
@@ -308,7 +311,7 @@ public class MetadataTest {
     public void testListenerGetsNotifiedOfUpdate() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         metadata.addListener(new Metadata.Listener() {
             @Override
             public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -317,15 +320,11 @@ public class MetadataTest {
             }
         });
 
-        metadata.update(new Cluster(
-                null,
-                Arrays.asList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -335,7 +334,7 @@ public class MetadataTest {
     public void testListenerCanUnregister() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         final Metadata.Listener listener = new Metadata.Listener() {
             @Override
             public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -345,27 +344,19 @@ public class MetadataTest {
         };
         metadata.addListener(listener);
 
-        metadata.update(new Cluster(
-                "cluster",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic", 0, null, null, null),
-                    new PartitionInfo("topic1", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic", 1);
+        partitionCounts.put("topic1", 1);
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         metadata.removeListener(listener);
 
-        metadata.update(new Cluster(
-                "cluster",
-                Arrays.asList(new Node(0, "host1", 1000)),
-                Arrays.asList(
-                    new PartitionInfo("topic2", 0, null, null, null),
-                    new PartitionInfo("topic3", 0, null, null, null)),
-                Collections.<String>emptySet(),
-                Collections.<String>emptySet()),
-            Collections.<String>emptySet(), 100);
+        partitionCounts.clear();
+        partitionCounts.put("topic2", 1);
+        partitionCounts.put("topic3", 1);
+        metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+        metadata.update(metadataResponse, 100);
 
         assertEquals("Listener did not update topics list correctly",
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -378,17 +369,17 @@ public class MetadataTest {
         // Test that topic is expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+            metadata.update(emptyMetadataResponse(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -397,9 +388,9 @@ public class MetadataTest {
         HashSet<String> topics = new HashSet<>();
         topics.add("topic4");
         metadata.setTopics(topics);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
     }
 
@@ -410,17 +401,17 @@ public class MetadataTest {
         // Test that topic is not expired if not used within the expiry interval
         long time = 0;
         metadata.add("topic1");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         time += Metadata.TOPIC_EXPIRY_MS;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
 
         // Test that topic is not expired if used within the expiry interval
         metadata.add("topic2");
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         for (int i = 0; i < 3; i++) {
             time += Metadata.TOPIC_EXPIRY_MS / 2;
-            metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+            metadata.update(emptyMetadataResponse(), time);
             assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
             metadata.add("topic2");
         }
@@ -430,7 +421,7 @@ public class MetadataTest {
         topics.add("topic4");
         metadata.setTopics(topics);
         time += metadataExpireMs * 2;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+        metadata.update(emptyMetadataResponse(), time);
         assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a586af8..0cfb3be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
@@ -24,12 +23,12 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -38,6 +37,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
 
 /**
  * A mock network client for use testing code
@@ -73,10 +73,7 @@ public class MockClient implements KafkaClient {
 
     private int correlation;
     private final Time time;
-    private final Metadata metadata;
-    private Set<String> unavailableTopics;
-    private Cluster cluster;
-    private Node node = null;
+    private final MockMetadataUpdater metadataUpdater;
     private final Set<String> ready = new HashSet<>();
 
     // Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
@@ -97,14 +94,13 @@ public class MockClient implements KafkaClient {
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
 
-    public MockClient(Time time) {
-        this(time, null);
+    public MockClient(Time time, Metadata metadata) {
+        this(time, new DefaultMockMetadataUpdater(metadata));
     }
 
-    public MockClient(Time time, Metadata metadata) {
+    public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
         this.time = time;
-        this.metadata = metadata;
-        this.unavailableTopics = Collections.emptySet();
+        this.metadataUpdater = metadataUpdater;
         this.blackedOut = new TransientSet<>(time);
         this.unreachable = new TransientSet<>(time);
         this.delayedReady = new TransientSet<>(time);
@@ -280,21 +276,13 @@ public class MockClient implements KafkaClient {
         checkTimeoutOfPendingRequests(now);
 
         List<ClientResponse> copy = new ArrayList<>(this.responses);
-        if (metadata != null && metadata.updateRequested()) {
+        // We skip metadata updates if all nodes are currently blacked out
+        if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
-            if (cluster != null)
-                metadata.update(cluster, this.unavailableTopics, time.milliseconds());
-            if (metadataUpdate == null)
-                metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
-            else {
-                if (metadataUpdate.expectMatchRefreshTopics
-                    && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
-                    throw new IllegalStateException("The metadata topics does not match expectation. "
-                                                        + "Expected topics: " + metadataUpdate.cluster.topics()
-                                                        + ", asked topics: " + metadata.topics());
-                }
-                this.unavailableTopics = metadataUpdate.unavailableTopics;
-                metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
+            if (metadataUpdate != null) {
+                metadataUpdater.update(time, metadataUpdate);
+            } else {
+                metadataUpdater.updateWithCurrentMetadata(time);
             }
         }
 
@@ -310,6 +298,7 @@ public class MockClient implements KafkaClient {
         return Math.max(0, currentTimeMs - startTimeMs);
     }
 
+
     private void checkTimeoutOfPendingRequests(long nowMs) {
         ClientRequest request = requests.peek();
         while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
@@ -350,9 +339,9 @@ public class MockClient implements KafkaClient {
 
 
     public void respond(AbstractResponse response, boolean disconnected) {
-        ClientRequest request = null;
-        if (requests.size() > 0)
-            request = requests.remove();
+        if (requests.isEmpty())
+            throw new IllegalStateException("No requests pending for inbound response " + response);
+        ClientRequest request = requests.poll();
         short version = request.requestBuilder().latestAllowedVersion();
         responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
                 request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
@@ -463,22 +452,17 @@ public class MockClient implements KafkaClient {
         return futureResponses.size();
     }
 
-    public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+    public void prepareMetadataUpdate(MetadataResponse updateResponse) {
+        prepareMetadataUpdate(updateResponse, false);
     }
 
-    public void prepareMetadataUpdate(Cluster cluster,
-                                      Set<String> unavailableTopics,
+    public void prepareMetadataUpdate(MetadataResponse updateResponse,
                                       boolean expectMatchMetadataTopics) {
-        metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
-    }
-
-    public void setNode(Node node) {
-        this.node = node;
+        metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics));
     }
 
-    public void cluster(Cluster cluster) {
-        this.cluster = cluster;
+    public void updateMetadata(MetadataResponse updateResponse) {
+        metadataUpdater.update(time, new MetadataUpdate(updateResponse, false));
     }
 
     @Override
@@ -534,7 +518,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public void close() {
-        metadata.close();
+        metadataUpdater.close();
     }
 
     @Override
@@ -545,9 +529,11 @@ public class MockClient implements KafkaClient {
     @Override
     public Node leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
-        if (blackedOut.contains(node, now))
-            return null;
-        return this.node;
+        for (Node node : metadataUpdater.fetchNodes()) {
+            if (!blackedOut.contains(node, now))
+                return node;
+        }
+        return null;
     }
 
     /**
@@ -564,15 +550,20 @@ public class MockClient implements KafkaClient {
         this.nodeApiVersions = nodeApiVersions;
     }
 
-    private static class MetadataUpdate {
-        final Cluster cluster;
-        final Set<String> unavailableTopics;
+    public static class MetadataUpdate {
+        final MetadataResponse updateResponse;
         final boolean expectMatchRefreshTopics;
-        MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
-            this.cluster = cluster;
-            this.unavailableTopics = unavailableTopics;
+
+        MetadataUpdate(MetadataResponse updateResponse, boolean expectMatchRefreshTopics) {
+            this.updateResponse = updateResponse;
             this.expectMatchRefreshTopics = expectMatchRefreshTopics;
         }
+
+        private Set<String> topics() {
+            return updateResponse.topicMetadata().stream()
+                    .map(MetadataResponse.TopicMetadata::topic)
+                    .collect(Collectors.toSet());
+        }
     }
 
     private static class TransientSet<T> {
@@ -614,4 +605,64 @@ public class MockClient implements KafkaClient {
 
     }
 
+    /**
+     * This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
+     * metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
+     * and bookkeeping through {@link Metadata}. The extensibility allows AdminClient, which does
+     * not rely on {@link Metadata} to do its own thing.
+     */
+    public interface MockMetadataUpdater {
+        List<Node> fetchNodes();
+
+        boolean isUpdateNeeded();
+
+        void update(Time time, MetadataUpdate update);
+
+        default void updateWithCurrentMetadata(Time time) {}
+
+        default void close() {}
+    }
+
+    private static class DefaultMockMetadataUpdater implements MockMetadataUpdater {
+        private final Metadata metadata;
+        private MetadataUpdate lastUpdate;
+
+        public DefaultMockMetadataUpdater(Metadata metadata) {
+            this.metadata = metadata;
+        }
+
+        @Override
+        public List<Node> fetchNodes() {
+            return metadata.fetch().nodes();
+        }
+
+        @Override
+        public boolean isUpdateNeeded() {
+            return metadata.updateRequested();
+        }
+
+        @Override
+        public void updateWithCurrentMetadata(Time time) {
+            if (lastUpdate == null)
+                throw new IllegalStateException("No previous metadata update to use");
+            update(time, lastUpdate);
+        }
+
+        @Override
+        public void update(Time time, MetadataUpdate update) {
+            if (update.expectMatchRefreshTopics && !metadata.topics().equals(update.topics())) {
+                throw new IllegalStateException("The metadata topics does not match expectation. "
+                        + "Expected topics: " + update.topics()
+                        + ", asked topics: " + metadata.topics());
+            }
+            metadata.update(update.updateResponse, time.milliseconds());
+            this.lastUpdate = update;
+        }
+
+        @Override
+        public void close() {
+            metadata.close();
+        }
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8abe9a40..5a4e1e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -27,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.LogContext;
@@ -56,9 +56,9 @@ public class NetworkClientTest {
     protected final MockTime time = new MockTime();
     protected final MockSelector selector = new MockSelector(time);
     protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-    protected final int nodeId = 1;
-    protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
-    protected final Node node = cluster.nodes().get(0);
+    protected final MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+            Collections.singletonMap("test", 1));
+    protected final Node node = initialMetadataResponse.brokers().iterator().next();
     protected final long reconnectBackoffMsTest = 10 * 1000;
     protected final long reconnectBackoffMaxMsTest = 10 * 10000;
 
@@ -89,7 +89,7 @@ public class NetworkClientTest {
     @Before
     public void setup() {
         selector.reset();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(initialMetadataResponse, time.milliseconds());
     }
 
     @Test(expected = IllegalStateException.class)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index e77b48b..89f5fde 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -17,12 +17,14 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -42,38 +44,45 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
     private final MockClient mockClient;
     private final KafkaAdminClient adminClient;
 
-    public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Cluster cluster, String... vals) {
         this(Time.SYSTEM, cluster, vals);
     }
 
-    public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
         this(time, cluster, newStrMap(vals));
     }
 
-    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
-        this(newMockClient(time, cluster), time, cluster, config);
-    }
-
-    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster) {
-        this(mockClient, time, cluster, newStrMap());
-    }
-
-    private static MockClient newMockClient(Time time, Cluster cluster) {
-        MockClient mockClient = new MockClient(time);
-        mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
-            cluster.clusterResource().clusterId(),
-            cluster.controller() == null ? MetadataResponse.NO_CONTROLLER_ID : cluster.controller().id(),
-            Collections.emptyList()));
-        return mockClient;
+    public AdminClientUnitTestEnv(Time time, Cluster cluster) {
+        this(time, cluster, newStrMap());
     }
 
-    public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster,
-                                  Map<String, Object> config) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
-        this.mockClient = mockClient;
-        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, time);
+
+        AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
+                adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+                adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+        this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
+            @Override
+            public List<Node> fetchNodes() {
+                return cluster.nodes();
+            }
+
+            @Override
+            public boolean isUpdateNeeded() {
+                return false;
+            }
+
+            @Override
+            public void update(Time time, MockClient.MetadataUpdate update) {
+                throw new UnsupportedOperationException();
+            }
+        });
+
+        metadataManager.update(cluster, time.milliseconds());
+        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager, mockClient, time);
     }
 
     public Time time() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dc55ea2..1f62d39 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
@@ -191,6 +193,11 @@ public class KafkaAdminClientTest {
                 Collections.emptySet(), nodes.get(controllerIndex));
     }
 
+    private static Cluster mockBootstrapCluster() {
+        return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
+                Collections.singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
+    }
+
     private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
         return new AdminClientUnitTestEnv(mockCluster(0), configVals);
     }
@@ -206,15 +213,9 @@ public class KafkaAdminClientTest {
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
-                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
-                    AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -228,16 +229,13 @@ public class KafkaAdminClientTest {
         // This tests the scenario in which we successfully connect to the bootstrap server, but
         // the server disconnects before sending the full response
 
-        Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.setNodeApiVersions(NodeApiVersions.create());
-        mockClient.setNode(cluster.nodes().get(0));
-
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+        Cluster cluster = mockBootstrapCluster();
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
             Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
-            env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
-                    new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
+            env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
+                    new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
                     new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
@@ -256,13 +254,10 @@ public class KafkaAdminClientTest {
         // which prevents AdminClient from being able to send the initial metadata request
 
         Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.setNodeApiVersions(NodeApiVersions.create());
-        mockClient.setNode(cluster.nodes().get(0));
-        mockClient.setUnreachable(cluster.nodes().get(0), 200);
-
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
             Cluster discoveredCluster = mockCluster(0);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
             env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
                     new  MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
@@ -283,16 +278,12 @@ public class KafkaAdminClientTest {
     @Test
     public void testPropagatedMetadataFetchException() throws Exception {
         Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
-            TimeUnit.DAYS.toMillis(1));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
                 newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodeById(0));
+            env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
+                    TimeUnit.DAYS.toMillis(1));
             env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                 Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -305,7 +296,6 @@ public class KafkaAdminClientTest {
     public void testCreateTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
                     new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
@@ -319,30 +309,26 @@ public class KafkaAdminClientTest {
     public void testCreateTopicsRetryBackoff() throws Exception {
         Cluster cluster = mockCluster(0);
         MockTime time = new MockTime();
-        MockClient mockClient = new MockClient(time);
         int retryBackoff = 100;
 
-        mockClient.prepareResponse(body -> body instanceof MetadataRequest,
-                new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(),
-                        1, Collections.emptyList()));
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+                newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
+            MockClient mockClient = env.kafkaClient();
 
-        AtomicLong firstAttemptTime = new AtomicLong(0);
-        AtomicLong secondAttemptTime = new AtomicLong(0);
+            mockClient.setNodeApiVersions(NodeApiVersions.create());
 
-        mockClient.prepareResponse(body -> {
-            firstAttemptTime.set(time.milliseconds());
-            return body instanceof CreateTopicsRequest;
-        }, null, true);
+            AtomicLong firstAttemptTime = new AtomicLong(0);
+            AtomicLong secondAttemptTime = new AtomicLong(0);
 
-        mockClient.prepareResponse(body -> {
-            secondAttemptTime.set(time.milliseconds());
-            return body instanceof CreateTopicsRequest;
-        }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            mockClient.prepareResponse(body -> {
+                firstAttemptTime.set(time.milliseconds());
+                return body instanceof CreateTopicsRequest;
+            }, null, true);
 
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, time, cluster,
-                newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
-            mockClient.setNodeApiVersions(NodeApiVersions.create());
-            mockClient.setNode(env.cluster().controller());
+            mockClient.prepareResponse(body -> {
+                secondAttemptTime.set(time.milliseconds());
+                return body instanceof CreateTopicsRequest;
+            }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
 
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -354,18 +340,17 @@ public class KafkaAdminClientTest {
             time.sleep(retryBackoff);
 
             future.get();
-        }
 
-        long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-        assertEquals("CreateTopics retry did not await expected backoff",
-                retryBackoff, actualRetryBackoff);
+            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
+            assertEquals("CreateTopics retry did not await expected backoff",
+                    retryBackoff, actualRetryBackoff);
+        }
     }
 
     @Test
     public void testCreateTopicsHandleNotControllerException() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodeById(0));
             env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
                 Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
                 env.cluster().nodeById(0));
@@ -387,7 +372,6 @@ public class KafkaAdminClientTest {
     public void testDeleteTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
                     new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
@@ -413,7 +397,6 @@ public class KafkaAdminClientTest {
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             List<String> sillyTopicNames = asList("", null);
             Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
@@ -447,12 +430,10 @@ public class KafkaAdminClientTest {
         // We should continue retrying on metadata update failures in spite of retry configuration
 
         String topic = "topic";
-        MockClient mockClient = new MockClient(Time.SYSTEM);
         Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
         Cluster initializedCluster = mockCluster(0);
-        mockClient.setNode(bootstrapCluster.nodes().get(0));
 
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, bootstrapCluster,
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
                 newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
                         AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
                         AdminClientConfig.RETRIES_CONFIG, "0"))) {
@@ -484,17 +465,12 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testAdminClientApisAuthenticationFailure() throws Exception {
-        Cluster cluster = mockCluster(0);
-        MockClient mockClient = new MockClient(Time.SYSTEM);
-        mockClient.createPendingAuthenticationError(cluster.nodeById(0),
-            TimeUnit.DAYS.toMillis(1));
-        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
-                Time.SYSTEM,
-                cluster,
-                newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
-                     AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+        Cluster cluster = mockBootstrapCluster();
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+                newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
+            env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0),
+                    TimeUnit.DAYS.toMillis(1));
             callAdminClientApisAndExpectAnAuthenticationError(env);
         }
     }
@@ -570,7 +546,6 @@ public class KafkaAdminClientTest {
     public void testDescribeAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
@@ -597,7 +572,6 @@ public class KafkaAdminClientTest {
     public void testCreateAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we successfully create two ACLs.
             env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
@@ -625,7 +599,6 @@ public class KafkaAdminClientTest {
     public void testDeleteAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
             env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -677,7 +650,6 @@ public class KafkaAdminClientTest {
             AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
                 AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(nodes.get(0));
             assertEquals(time, env.time());
             assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
 
@@ -718,7 +690,6 @@ public class KafkaAdminClientTest {
     public void testDescribeConfigs() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
                 Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"),
                     new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptySet()))));
@@ -732,7 +703,6 @@ public class KafkaAdminClientTest {
     public void testCreatePartitions() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             Map<String, ApiError> m = new HashMap<>();
             m.put("my_topic", ApiError.NONE);
@@ -784,7 +754,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().nodes().get(0));
 
             Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> m = new HashMap<>();
             m.put(myTopicPartition0,
@@ -889,7 +858,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata response should be retried
             env.kafkaClient().prepareResponse(
@@ -991,7 +959,6 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
                 AdminClientConfig.RETRIES_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata causes the request to fail since we have no list of brokers
             // to send the ListGroups requests to
@@ -1022,7 +989,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
@@ -1085,7 +1051,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
@@ -1129,7 +1094,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().setNode(env.cluster().controller());
 
             //Retriable FindCoordinatorResponse errors should be retried
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,  Node.noNode()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4ac5876..89fca84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.InterruptException;
@@ -59,6 +58,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -361,14 +361,12 @@ public class KafkaConsumerTest {
     @Test
     public void verifyHeartbeatSent() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -396,14 +394,11 @@ public class KafkaConsumerTest {
     @Test
     public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -430,16 +425,14 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void verifyPollTimesOutDuringMetadataUpdate() throws Exception {
+    public void verifyPollTimesOutDuringMetadataUpdate() {
         final Time time = new MockTime();
-        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        final Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -454,16 +447,14 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exception {
+    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
         final Time time = new MockTime();
-        final Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        final Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -483,14 +474,10 @@ public class KafkaConsumerTest {
     @Test
     public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -514,14 +501,9 @@ public class KafkaConsumerTest {
         // a reset on another partition.
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+        initMetadata(client, Collections.singletonMap(topic, 2));
 
         KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, metadata);
         consumer.assign(Arrays.asList(tp0, tp1));
@@ -555,17 +537,20 @@ public class KafkaConsumerTest {
         assertEquals(singleton(tp0), records.partitions());
     }
 
+    private void initMetadata(MockClient mockClient, Map<String, Integer> partitionCounts) {
+        MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, partitionCounts);
+        mockClient.updateMetadata(initialMetadata);
+    }
+
     @Test(expected = NoOffsetForPartitionException.class)
     public void testMissingOffsetNoResetPolicy() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -583,14 +568,12 @@ public class KafkaConsumerTest {
     @Test
     public void testResetToCommittedOffset() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -609,14 +592,12 @@ public class KafkaConsumerTest {
     @Test
     public void testResetUsingAutoResetPolicy() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -640,14 +621,12 @@ public class KafkaConsumerTest {
         long offset2 = 20000;
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -679,14 +658,12 @@ public class KafkaConsumerTest {
     @Test
     public void testAutoCommitSentBeforePositionUpdate() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -717,17 +694,15 @@ public class KafkaConsumerTest {
     public void testRegexSubscription() {
         String unmatchedTopic = "unmatched";
         Time time = new MockTime();
-
-        Map<String, Integer> topicMetadata = new HashMap<>();
-        topicMetadata.put(topic, 1);
-        topicMetadata.put(unmatchedTopic, 1);
-
-        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
         Metadata metadata = createMetadata();
-        Node node = cluster.nodes().get(0);
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic, 1);
+        partitionCounts.put(unmatchedTopic, 1);
+        initMetadata(client, partitionCounts);
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -735,7 +710,7 @@ public class KafkaConsumerTest {
 
         consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
 
         consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -752,20 +727,14 @@ public class KafkaConsumerTest {
         TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
 
         Time time = new MockTime();
-
-        Map<String, Integer> topicMetadata = new HashMap<>();
-        topicMetadata.put(topic, 1);
-        topicMetadata.put(otherTopic, 1);
-
-        Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
         Metadata metadata = createMetadata();
-        Node node = cluster.nodes().get(0);
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
-        client.cluster(cluster);
 
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(topic, 1);
+        partitionCounts.put(otherTopic, 1);
+        initMetadata(client, partitionCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
 
@@ -779,6 +748,7 @@ public class KafkaConsumerTest {
 
         consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
 
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
         prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
         consumer.poll(Duration.ZERO);
 
@@ -789,14 +759,12 @@ public class KafkaConsumerTest {
     @Test
     public void testWakeupWithFetchDataAvailable() throws Exception {
         final Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -838,16 +806,14 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
+    public void testPollThrowsInterruptExceptionIfInterrupted() {
         final Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        final Node node = cluster.nodes().get(0);
+        final Metadata metadata = createMetadata();
+        final MockClient client = new MockClient(time, metadata);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        final MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         final PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -872,14 +838,12 @@ public class KafkaConsumerTest {
     @Test
     public void fetchResponseWithUnexpectedPartitionIsIgnored() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -909,18 +873,16 @@ public class KafkaConsumerTest {
     @Test
     public void testSubscriptionChangesWithAutoCommitEnabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
         tpCounts.put(topic3, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1025,17 +987,15 @@ public class KafkaConsumerTest {
     @Test
     public void testSubscriptionChangesWithAutoCommitDisabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1088,17 +1048,15 @@ public class KafkaConsumerTest {
     @Test
     public void testManualAssignmentChangeWithAutoCommitEnabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1145,17 +1103,15 @@ public class KafkaConsumerTest {
     @Test
     public void testManualAssignmentChangeWithAutoCommitDisabled() {
         Time time = new MockTime();
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
+
         Map<String, Integer> tpCounts = new HashMap<>();
         tpCounts.put(topic, 1);
         tpCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
-        Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, tpCounts);
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1203,14 +1159,12 @@ public class KafkaConsumerTest {
     @Test
     public void testOffsetOfPausedPartitions() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 2);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 2));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1334,14 +1288,12 @@ public class KafkaConsumerTest {
     @Test
     public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
+        Metadata metadata = createMetadata();
+        MockClient client = new MockClient(time, metadata);
 
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
 
-        MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1404,21 +1356,19 @@ public class KafkaConsumerTest {
                                    long waitMs,
                                    boolean interrupt) throws Exception {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
         consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
         Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
 
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1)));
 
         consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -1542,16 +1492,12 @@ public class KafkaConsumerTest {
 
     private KafkaConsumer<String, String> consumerWithPendingAuthentication() {
         Time time = new MockTime();
-        Map<String, Integer> tpCounts = new HashMap<>();
-        tpCounts.put(topic, 1);
-        Cluster cluster = TestUtils.singletonCluster(tpCounts);
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
         PartitionAssignor assignor = new RangeAssignor();
 
         client.createPendingAuthenticationError(node, 0);
@@ -1861,29 +1807,24 @@ public class KafkaConsumerTest {
     @Test(expected = InvalidTopicException.class)
     public void testSubscriptionOnInvalidTopic() {
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-
         Metadata metadata = createMetadata();
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Cluster cluster = metadata.fetch();
+
         PartitionAssignor assignor = new RoundRobinAssignor();
 
         String invalidTopicName = "topic abc";  // Invalid topic name due to space
 
-        Set<String> invalidTopic = new HashSet<String>();
-        invalidTopic.add(invalidTopicName);
-        Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
-                                                            cluster.nodes(),
-                                                            new ArrayList<PartitionInfo>(0),
-                                                            Collections.<String>emptySet(),
-                                                            invalidTopic,
-                                                            cluster.internalTopics(),
-                                                            cluster.controller());
-        client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
-
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+                invalidTopicName, false, Collections.emptyList()));
+        MetadataResponse updateResponse = new MetadataResponse(cluster.nodes(),
+                cluster.clusterResource().clusterId(),
+                cluster.controller().id(),
+                topicMetadata);
+        client.prepareMetadataUpdate(updateResponse);
 
         KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
         consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 004445f..a0e7ab0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -52,6 +51,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.Collections.emptyMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
@@ -87,18 +87,15 @@ public class AbstractCoordinatorTest {
 
     private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
         this.mockTime = new MockTime();
-        this.mockClient = new MockClient(mockTime);
-
         Metadata metadata = new Metadata(retryBackoffMs, 60 * 60 * 1000L, true);
+
+        this.mockClient = new MockClient(mockTime, metadata);
         this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime,
                 retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
         Metrics metrics = new Metrics();
 
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        metadata.update(cluster, Collections.emptySet(), mockTime.milliseconds());
-        this.node = cluster.nodes().get(0);
-        mockClient.setNode(node);
-
+        mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
+        this.node = metadata.fetch().nodes().get(0);
         this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
     }
@@ -244,11 +241,11 @@ public class AbstractCoordinatorTest {
     public void testLookupCoordinator() {
         setupCoordinator();
 
-        mockClient.setNode(null);
+        mockClient.blackout(node, 50);
         RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator();
         assertTrue("Failed future expected", noBrokersAvailableFuture.failed());
+        mockTime.sleep(50);
 
-        mockClient.setNode(node);
         RequestFuture<Void> future = coordinator.lookupCoordinator();
         assertFalse("Request not sent", future.isDone());
         assertSame("New request sent while one is in progress", future, coordinator.lookupCoordinator());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index af073f1..72808c8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -40,6 +39,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordBatch;
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.LeaveGroupRequest;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -114,13 +115,13 @@ public class ConsumerCoordinatorTest {
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
     private MockClient client;
-    private Cluster cluster = TestUtils.clusterWith(1, new HashMap<String, Integer>() {
+    private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
         {
             put(topic1, 1);
             put(topic2, 1);
         }
     });
-    private Node node = cluster.nodes().get(0);
+    private Node node = metadataResponse.brokers().iterator().next();
     private SubscriptionState subscriptions;
     private Metadata metadata;
     private Metrics metrics;
@@ -133,8 +134,8 @@ public class ConsumerCoordinatorTest {
     public void setup() {
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
-        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
         this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(metadataResponse);
         this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100,
                 requestTimeoutMs, Integer.MAX_VALUE);
         this.metrics = new Metrics(time);
@@ -142,7 +143,6 @@ public class ConsumerCoordinatorTest {
         this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
 
-        client.setNode(node);
         this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
     }
 
@@ -366,7 +366,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -384,7 +384,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -423,7 +423,7 @@ public class ConsumerCoordinatorTest {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -443,7 +443,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -463,7 +463,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
         metadata.needMetadataForAllTopics(true);
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         assertEquals(singleton(topic1), subscriptions.subscription());
 
@@ -484,7 +484,7 @@ public class ConsumerCoordinatorTest {
                 final Map<String, Integer> updatedPartitions = new HashMap<>();
                 for (String topic : updatedSubscription)
                     updatedPartitions.put(topic, 1);
-                metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds());
+                client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
                 return true;
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -526,15 +526,14 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(),
-            time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
         assertEquals(singleton(topic1), subscriptions.subscription());
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // Instrument the test so that metadata will contain two topics after next refresh.
-        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -571,7 +570,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -671,7 +670,7 @@ public class ConsumerCoordinatorTest {
         // partially update the metadata with one topic first,
         // let the leader to refresh metadata during assignment
         metadata.setTopics(singletonList(topic1));
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -688,7 +687,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
-        client.prepareMetadataUpdate(cluster, Collections.emptySet());
+        client.prepareMetadataUpdate(metadataResponse);
 
         coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
 
@@ -847,7 +846,7 @@ public class ConsumerCoordinatorTest {
 
         // ensure metadata is up-to-date for leader
         metadata.setTopics(singletonList(topic1));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(metadataResponse);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
@@ -866,7 +865,7 @@ public class ConsumerCoordinatorTest {
         assertFalse(coordinator.rejoinNeededOrPending());
 
         // a new partition is added to the topic
-        metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
+        metadata.update(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 2)), time.milliseconds());
 
         // we should detect the change and ask for reassignment
         assertTrue(coordinator.rejoinNeededOrPending());
@@ -886,7 +885,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(topics);
 
         // we only have metadata for one topic initially
-        metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -907,7 +906,7 @@ public class ConsumerCoordinatorTest {
                     Map<String, Integer> topicPartitionCounts = new HashMap<>();
                     topicPartitionCounts.put(topic1, 1);
                     topicPartitionCounts.put(topic2, 1);
-                    metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds());
+                    client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
                     return true;
                 }
                 return false;
@@ -948,7 +947,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(topics);
 
         // we only have metadata for one topic initially
-        metadata.update(TestUtils.singletonCluster(topic, 1), Collections.emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
 
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -977,33 +976,32 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRebalanceAfterTopicUnavailableWithSubscribe() {
-        unavailableTopicTest(false, false, Collections.<String>emptySet());
+        unavailableTopicTest(false, Collections.emptySet());
     }
 
     @Test
     public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
-        unavailableTopicTest(true, false, Collections.<String>emptySet());
+        unavailableTopicTest(true, Collections.emptySet());
     }
 
     @Test
-    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
-        unavailableTopicTest(true, false, Collections.singleton("notmatching"));
+    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
+        unavailableTopicTest(true, Collections.singleton("notmatching"));
     }
 
     @Test
     public void testAssignWithTopicUnavailable() {
-        unavailableTopicTest(true, false, Collections.<String>emptySet());
+        unavailableTopicTest(true, Collections.emptySet());
     }
 
-    private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
+    private void unavailableTopicTest(boolean patternSubscribe, Set<String> unavailableTopicsInLastMetadata) {
         final String consumerId = "consumer";
 
         metadata.setTopics(singletonList(topic1));
-        client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                Collections.singletonMap(topic1, Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
 
-        if (assign)
-            subscriptions.assignFromUser(singleton(t1p));
-        else if (patternSubscribe)
+        if (patternSubscribe)
             subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
         else
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1017,32 +1015,34 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
-        if (!assign) {
-            assertFalse(coordinator.rejoinNeededOrPending());
-            assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
-        }
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
         assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
 
-        client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata);
+        Map<String, Errors> topicErrors = new HashMap<>();
+        for (String topic : unavailableTopicsInLastMetadata)
+            topicErrors.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+        client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                topicErrors, singletonMap(topic1, 1)));
+
         client.poll(0, time.milliseconds());
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
         assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
-        if (!assign) {
-            assertFalse(coordinator.rejoinNeededOrPending());
-            assertEquals(singleton(t1p), rebalanceListener.assigned);
-        }
+        assertFalse(coordinator.rejoinNeededOrPending());
+        assertEquals(singleton(t1p), rebalanceListener.assigned);
     }
 
     @Test
     public void testExcludeInternalTopicsConfigOption() {
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 1)));
 
-        assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+        assertFalse(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
     }
 
     @Test
@@ -1050,9 +1050,9 @@ public class ConsumerCoordinatorTest {
         coordinator = buildCoordinator(new Metrics(), assignors, false, false, true);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
-        metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 2)));
 
-        assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+        assertTrue(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 8f6328d..45b420e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -50,10 +50,10 @@ public class ConsumerNetworkClientTest {
 
     private String topicName = "test";
     private MockTime time = new MockTime(1);
-    private MockClient client = new MockClient(time);
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+    private MockClient client = new MockClient(time, metadata);
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
             client, metadata, time, 100, 1000, Integer.MAX_VALUE);
 
@@ -273,7 +273,7 @@ public class ConsumerNetworkClientTest {
         int requestTimeoutMs = 10;
         final AtomicBoolean isReady = new AtomicBoolean();
         final AtomicBoolean disconnected = new AtomicBoolean();
-        client = new MockClient(time) {
+        client = new MockClient(time, metadata) {
             @Override
             public boolean ready(Node node, long now) {
                 if (isReady.get())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 94d8d5b..c7b0b30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -113,6 +113,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -122,7 +123,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 @SuppressWarnings("deprecation")
 public class FetcherTest {
     private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
@@ -140,12 +140,12 @@ public class FetcherTest {
     private long retryBackoffMs = 100;
     private long requestTimeoutMs = 30000;
     private MockTime time = new MockTime(1);
+    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private MockClient client = new MockClient(time, metadata);
-    private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
-    private Node node = cluster.nodes().get(0);
+    private Node node;
     private Metrics metrics = new Metrics(time);
-    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
+    private FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
 
     private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
@@ -163,10 +163,9 @@ public class FetcherTest {
     private ExecutorService executorService;
 
     @Before
-    public void setup() throws Exception {
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-        client.setNode(node);
-
+    public void setup() {
+        client.updateMetadata(initialUpdateResponse);
+        node = metadata.fetch().nodes().get(0);
         records = buildRecords(1L, 3, 1);
         nextRecords = buildRecords(4L, 2, 4);
         emptyRecords = buildRecords(0L, 0, 0);
@@ -1204,7 +1203,7 @@ public class FetcherTest {
         assertFalse(subscriptions.hasValidPosition(tp0));
 
         // Expect a metadata refresh
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(initialUpdateResponse);
         consumerClient.pollNoWakeup();
         assertFalse(client.hasPendingMetadataUpdates());
 
@@ -1233,7 +1232,7 @@ public class FetcherTest {
         assertFalse(subscriptions.hasValidPosition(tp0));
 
         // Expect a metadata refresh
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+        client.prepareMetadataUpdate(initialUpdateResponse);
         consumerClient.pollNoWakeup();
         assertFalse(client.hasPendingMetadataUpdates());
 
@@ -1436,7 +1435,7 @@ public class FetcherTest {
 
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
 
-        assertEquals(cluster.topics().size(), allTopics.size());
+        assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
     }
 
     @Test
@@ -1445,7 +1444,7 @@ public class FetcherTest {
         client.prepareResponse(null, true);
         client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
         Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
-        assertEquals(cluster.topics().size(), allTopics.size());
+        assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
     }
 
     @Test(expected = TimeoutException.class)
@@ -1535,7 +1534,7 @@ public class FetcherTest {
         Assert.assertNotNull(topicMetadata);
         Assert.assertNotNull(topicMetadata.get(topicName));
         //noinspection ConstantConditions
-        Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
+        Assert.assertEquals((int) metadata.fetch().partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
     }
 
     /*
@@ -1717,8 +1716,7 @@ public class FetcherTest {
         Map<String, Integer> partitionCounts = new HashMap<>();
         partitionCounts.put(topic1, 1);
         partitionCounts.put(topic2, 1);
-        Cluster cluster = TestUtils.clusterWith(1, partitionCounts);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts));
 
         subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
 
@@ -1950,21 +1948,24 @@ public class FetcherTest {
         client.reset();
 
         // Metadata initially has one topic
-        Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(3, singletonMap(topicName, 2));
+        client.updateMetadata(initialMetadata);
 
         // The first metadata refresh should contain one topic
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false);
-        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1));
+        client.prepareMetadataUpdate(initialMetadata, false);
+        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L),
+                metadata.fetch().leaderFor(tp0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L),
+                metadata.fetch().leaderFor(tp1));
 
         // Second metadata refresh should contain two topics
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
         partitionNumByTopic.put(anotherTopic, 1);
-        Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
-        client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false);
-        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0));
+        MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(3, partitionNumByTopic);
+        client.prepareMetadataUpdate(updatedMetadata, false);
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L),
+                metadata.fetch().leaderFor(t2p0));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
@@ -2518,9 +2519,11 @@ public class FetcherTest {
         Set<TopicPartition> topicPartitions = new HashSet<>();
         for (int i = 0; i < numPartitions; i++)
             topicPartitions.add(new TopicPartition(topicName, i));
-        cluster = TestUtils.singletonCluster(topicName, numPartitions);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-        client.setNode(node);
+
+        MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+                singletonMap(topicName, numPartitions));
+        client.updateMetadata(initialMetadataResponse);
+        node = metadata.fetch().nodes().get(0);
         fetchSize = 10000;
 
         Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(
@@ -2735,23 +2738,28 @@ public class FetcherTest {
         String topicName2 = "topic2";
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
-        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
-                        Collections.<String>emptySet(),
-                        time.milliseconds());
+        metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
+                ClientDnsLookup.DEFAULT), time.milliseconds());
 
         Map<String, Integer> partitionNumByTopic = new HashMap<>();
         partitionNumByTopic.put(topicName, 2);
         partitionNumByTopic.put(topicName2, 1);
-        cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+        MetadataResponse updateMetadataResponse = TestUtils.metadataUpdateWith(2, partitionNumByTopic);
+        Cluster updatedCluster = updateMetadataResponse.cluster();
+
         // The metadata refresh should contain all the topics.
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
+        client.prepareMetadataUpdate(updateMetadataResponse, true);
 
         // First try should fail due to metadata error.
-        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0),
+                updatedCluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1),
+                updatedCluster.leaderFor(tp1));
         // Second try should succeed.
-        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
-        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0),
+                updatedCluster.leaderFor(t2p0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1),
+                updatedCluster.leaderFor(tp1));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(t2p0, 0L);
@@ -2777,15 +2785,16 @@ public class FetcherTest {
     private void testGetOffsetsForTimesWithUnknownOffset() {
         client.reset();
         // Ensure metadata has both partition.
-        Cluster cluster = TestUtils.clusterWith(1, topicName, 1);
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+        client.updateMetadata(initialMetadataUpdate);
 
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
         partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE,
                 ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
                 Optional.empty()));
 
-        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), cluster.leaderFor(tp0));
+        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+                metadata.fetch().leaderFor(tp0));
 
         Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
         timestampToSearch.put(tp0, 0L);
@@ -2851,20 +2860,20 @@ public class FetcherTest {
     private MetadataResponse newMetadataResponse(String topic, Errors error) {
         List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
         if (error == Errors.NONE) {
-            for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
-                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
-                        Errors.NONE,
-                        partitionInfo.partition(),
-                        partitionInfo.leader(),
-                        Optional.empty(),
-                        Arrays.asList(partitionInfo.replicas()),
-                        Arrays.asList(partitionInfo.inSyncReplicas()),
-                        Arrays.asList(partitionInfo.offlineReplicas())));
-            }
+            Optional<MetadataResponse.TopicMetadata> foundMetadata = initialUpdateResponse.topicMetadata()
+                    .stream()
+                    .filter(topicMetadata -> topicMetadata.topic().equals(topic))
+                    .findFirst();
+            foundMetadata.ifPresent(topicMetadata -> {
+                partitionsMetadata.addAll(topicMetadata.partitionMetadata());
+            });
         }
 
-        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
-        return new MetadataResponse(cluster.nodes(), null, MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
+        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false,
+                partitionsMetadata);
+        List<Node> brokers = new ArrayList<>(initialUpdateResponse.brokers());
+        return new MetadataResponse(brokers, initialUpdateResponse.clusterId(),
+                initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata));
     }
 
     private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index b5d7709..b544a65 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.producer;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -37,6 +34,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
@@ -52,10 +51,12 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -66,6 +67,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertArrayEquals;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -209,20 +212,16 @@ public class KafkaProducerTest {
 
     @Test
     public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
-        Map<String, Object> configs = new HashMap();
+        Map<String, Object> configs = new HashMap<>();
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
         configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
+        client.updateMetadata(initialUpdateResponse);
 
         final Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
                 new StringSerializer(), metadata, client, null, time);
@@ -443,7 +442,9 @@ public class KafkaProducerTest {
                 for (int i = 0; i < 10; i++) {
                     while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
                         Thread.yield();
-                    metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+                    MetadataResponse updateResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1,
+                            singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
+                    metadata.update(updateResponse, time.milliseconds());
                     time.sleep(60 * 1000L);
                 }
             });
@@ -479,14 +480,9 @@ public class KafkaProducerTest {
         Serializer<String> valueSerializer = mock(serializerClassToMock);
 
         String topic = "topic";
-        final Cluster cluster = new Cluster(
-                "dummy",
-                Collections.singletonList(new Node(0, "host1", 1000)),
-                Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
-                Collections.emptySet(),
-                Collections.emptySet());
         Metadata metadata = new Metadata(0, 90000, true);
-        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+        metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
 
         KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
                 null, null, Time.SYSTEM);
@@ -553,13 +549,8 @@ public class KafkaProducerTest {
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
 
         Metadata metadata = new Metadata(0, 90000, true);
-        final Cluster cluster = new Cluster(
-            "dummy",
-            Collections.singletonList(new Node(0, "host1", 1000)),
-            Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
-            Collections.emptySet(),
-            Collections.emptySet());
-        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+        metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
 
         @SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
                 ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
@@ -595,15 +586,12 @@ public class KafkaProducerTest {
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
-        Time time = Time.SYSTEM;
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
                 new StringSerializer(), metadata, client, null, time)) {
@@ -620,14 +608,11 @@ public class KafkaProducerTest {
         configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster("topic", 1);
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
@@ -651,14 +636,11 @@ public class KafkaProducerTest {
         configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
 
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
         Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        metadata.update(initialUpdateResponse, time.milliseconds());
 
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
@@ -666,20 +648,19 @@ public class KafkaProducerTest {
         String invalidTopicName = "topic abc"; // Invalid topic name due to space
         ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
 
-        Set<String> invalidTopic = new HashSet<>();
-        invalidTopic.add(invalidTopicName);
-        Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
-                cluster.nodes(),
-                new ArrayList<>(0),
-                Collections.emptySet(),
-                invalidTopic,
-                cluster.internalTopics(),
-                cluster.controller());
-        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.emptySet());
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+                invalidTopicName, false, Collections.emptyList()));
+        MetadataResponse updateResponse = new MetadataResponse(
+                new ArrayList<>(initialUpdateResponse.brokers()),
+                initialUpdateResponse.clusterId(),
+                initialUpdateResponse.controller().id(),
+                topicMetadata);
+        client.prepareMetadataUpdate(updateResponse);
 
         Future<RecordMetadata> future = producer.send(record);
 
-        assertEquals("Cluster has incorrect invalid topic list", metaDataUpdateResponseCluster.invalidTopics(),
+        assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
                 metadata.fetch().invalidTopics());
         TestUtils.assertFutureError(future, InvalidTopicException.class);
 
@@ -697,12 +678,10 @@ public class KafkaProducerTest {
         // return with a KafkaException.
         String topicName = "test";
         Time time = new MockTime();
-        Cluster cluster = TestUtils.singletonCluster();
-        Node node = cluster.nodes().get(0);
-        Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
-        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+        MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(initialUpdateResponse, time.milliseconds());
         MockClient client = new MockClient(time, metadata);
-        client.setNode(node);
 
         Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
                 metadata, client, null, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8a8ddd3..606637e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -73,6 +73,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -115,11 +116,10 @@ public class SenderTest {
     private TopicPartition tp0 = new TopicPartition("test", 0);
     private TopicPartition tp1 = new TopicPartition("test", 1);
     private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+    private MockClient client = new MockClient(time, metadata);
     private ApiVersions apiVersions = new ApiVersions();
-    private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
@@ -128,7 +128,6 @@ public class SenderTest {
 
     @Before
     public void setup() {
-        client.setNode(cluster.nodes().get(0));
         setupWithTransactionState(null);
     }
 
@@ -395,8 +394,8 @@ public class SenderTest {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
-            Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
-            metadata.update(cluster1, Collections.emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
+            client.prepareMetadataUpdate(metadataUpdate1);
 
             // Send the first message.
             TopicPartition tp2 = new TopicPartition("test", 1);
@@ -416,8 +415,8 @@ public class SenderTest {
             accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
 
             // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
-            Cluster cluster2 = TestUtils.singletonCluster("test", 2);
-            metadata.update(cluster2, Collections.emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
+            client.prepareMetadataUpdate(metadataUpdate2);
             // Sender should not send the second message to node 0.
             assertEquals(1, sender.inFlightBatches(tp2).size());
             sender.run(time.milliseconds());  // receive the response for the previous send, and send the new batch
@@ -458,9 +457,9 @@ public class SenderTest {
         // Advance the clock to expire the first batch.
         time.sleep(10000);
 
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         Map<Integer, List<ProducerBatch>> drainedBatches =
-            accumulator.drain(cluster, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
+            accumulator.drain(metadata.fetch(), Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
         sender.addToInflightBatches(drainedBatches);
 
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
@@ -484,12 +483,12 @@ public class SenderTest {
     @Test
     public void testMetadataTopicExpiry() throws Exception {
         long offset = 0;
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
 
         Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
@@ -501,12 +500,12 @@ public class SenderTest {
 
         assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
         time.sleep(Metadata.TOPIC_EXPIRY_MS);
-        metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
         assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
         future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
         sender.run(time.milliseconds());
         assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
         sender.run(time.milliseconds());  // send produce request
         client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
         sender.run(time.milliseconds());
@@ -522,7 +521,6 @@ public class SenderTest {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
         assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
@@ -534,7 +532,6 @@ public class SenderTest {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
         assertFalse(transactionManager.hasProducerId());
         assertTrue(transactionManager.hasError());
@@ -988,7 +985,7 @@ public class SenderTest {
 
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(10000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1024,7 +1021,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // receive first response
         assertEquals(1, sender.inFlightBatches(tp0).size());
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         // We add 600 millis to expire the first batch but not the second.
         // Note deliveryTimeoutMs is 1500.
         time.sleep(600L);
@@ -1088,7 +1085,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
         sender.run(time.milliseconds());  // receive first response
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(1000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1136,7 +1133,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
 
-        Node node = this.cluster.nodes().get(0);
+        Node node = metadata.fetch().nodes().get(0);
         time.sleep(15000L);
         client.disconnect(node.idString());
         client.blackout(node, 10);
@@ -1160,7 +1157,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1206,7 +1202,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1576,7 +1571,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1604,7 +1598,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1647,7 +1640,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1673,7 +1665,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         setupWithTransactionState(transactionManager);
 
-        client.setNode(new Node(1, "localhost", 33343));
         prepareAndReceiveInitProducerId(producerId, Errors.NONE);
         assertTrue(transactionManager.hasProducerId());
 
@@ -1700,7 +1691,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1745,7 +1735,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1784,7 +1773,6 @@ public class SenderTest {
         TransactionManager transactionManager = new TransactionManager();
         transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
         setupWithTransactionState(transactionManager);
-        client.setNode(new Node(1, "localhost", 33343));
 
         int maxRetries = 10;
         Metrics m = new Metrics();
@@ -1853,8 +1841,8 @@ public class SenderTest {
             Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
-            Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
-            metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+            MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
+            client.prepareMetadataUpdate(metadataUpdate1);
             // Send the first message.
             Future<RecordMetadata> f1 =
                     accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
@@ -2068,7 +2056,7 @@ public class SenderTest {
 
     @Test
     public void testResetNextBatchExpiry() throws Exception {
-        client = spy(new MockClient(time));
+        client = spy(new MockClient(time, metadata));
 
         setupWithTransactionState(null);
 
@@ -2184,25 +2172,20 @@ public class SenderTest {
     }
 
     private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+        long deliveryTimeoutMs = 1500L;
         long totalSize = 1024 * 1024;
         String metricGrpName = "producer-metrics";
-        Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put("client-id", CLIENT_ID);
-        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
         this.metrics = new Metrics(metricConfig, time);
         BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
-        setupWithTransactionState(transactionManager, guaranteeOrder, metricTags, pool);
-    }
 
-    private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, BufferPool pool) {
-        long deliveryTimeoutMs = 1500L;
-        String metricGrpName = "producer-metrics";
         this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
-            deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
+                deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
         this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
-            Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds());
+                Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
     }
 
     private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
@@ -2244,7 +2227,7 @@ public class SenderTest {
     }
 
     private void prepareFindCoordinatorResponse(Errors error) {
-        client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0)));
+        client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
     }
 
     private void prepareInitPidResponse(Errors error, long pid, short epoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index cf730b9..72c0a0b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -102,11 +102,10 @@ public class TransactionManagerTest {
     private TopicPartition tp0 = new TopicPartition(topic, 0);
     private TopicPartition tp1 = new TopicPartition(topic, 1);
     private MockTime time = new MockTime();
-    private MockClient client = new MockClient(time);
-
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+    private MockClient client = new MockClient(time, metadata);
+
     private ApiVersions apiVersions = new ApiVersions();
-    private Cluster cluster = TestUtils.singletonCluster("test", 2);
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
     private TransactionManager transactionManager = null;
@@ -133,8 +132,7 @@ public class TransactionManagerTest {
                 new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
         this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
-        this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
-        client.setNode(brokerNode);
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
     }
 
     @Test
@@ -2042,7 +2040,7 @@ public class TransactionManagerTest {
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2098,7 +2096,7 @@ public class TransactionManagerTest {
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2155,7 +2153,7 @@ public class TransactionManagerTest {
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
@@ -2226,7 +2224,7 @@ public class TransactionManagerTest {
         time.sleep(10000);
         // Disconnect the target node for the pending produce request. This will ensure that sender will try to
         // expire the batch.
-        Node clusterNode = this.cluster.nodes().get(0);
+        Node clusterNode = metadata.fetch().nodes().get(0);
         client.disconnect(clusterNode.idString());
         client.blackout(clusterNode, 100);
 
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index afb342b..50d1eeb 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,9 +21,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -42,15 +45,16 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
-import java.util.concurrent.ExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -70,9 +74,6 @@ public class TestUtils {
     public static final String DIGITS = "0123456789";
     public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
 
-    public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
-    public static final Set<String> INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME);
-
     /* A consistent random number generator to make tests repeatable */
     public static final Random SEEDED_RANDOM = new Random(192348092834L);
     public static final Random RANDOM = new Random();
@@ -105,7 +106,52 @@ public class TestUtils {
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
         }
-        return new Cluster("kafka-cluster", asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
+        return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Topic.INTERNAL_TOPICS);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final int numNodes,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        return metadataUpdateWith("kafka-cluster", numNodes, topicPartitionCounts);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final String clusterId,
+                                                      final int numNodes,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts);
+    }
+
+    public static MetadataResponse metadataUpdateWith(final String clusterId,
+                                                      final int numNodes,
+                                                      final Map<String, Errors> topicErrors,
+                                                      final Map<String, Integer> topicPartitionCounts) {
+        final List<Node> nodes = new ArrayList<>(numNodes);
+        for (int i = 0; i < numNodes; i++)
+            nodes.add(new Node(i, "localhost", 1969 + i));
+
+        List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+        for (Map.Entry<String, Integer> topicPartitionCountEntry : topicPartitionCounts.entrySet()) {
+            String topic = topicPartitionCountEntry.getKey();
+            int numPartitions = topicPartitionCountEntry.getValue();
+
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(numPartitions);
+            for (int i = 0; i < numPartitions; i++) {
+                Node leader = nodes.get(i % nodes.size());
+                List<Node> replicas = Collections.singletonList(leader);
+                partitionMetadata.add(new MetadataResponse.PartitionMetadata(
+                        Errors.NONE, i, leader, Optional.empty(), replicas, replicas, replicas));
+            }
+
+            topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
+                    Topic.isInternal(topic), partitionMetadata));
+        }
+
+        for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
+            String topic = topicErrorEntry.getKey();
+            topicMetadata.add(new MetadataResponse.TopicMetadata(topicErrorEntry.getValue(), topic,
+                    Topic.isInternal(topic), Collections.emptyList()));
+        }
+
+        return new MetadataResponse(nodes, clusterId, 0, topicMetadata);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, final int partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index cb62fb6..ed48d57 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -99,7 +98,7 @@ public class WorkerGroupMember {
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                     config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
                     config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+            this.metadata.bootstrap(addresses, time.milliseconds());
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
             NetworkClient netClient = new NetworkClient(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index ede6c71..7ccb68c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.distributed;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -74,8 +73,7 @@ public class WorkerCoordinatorTest {
     private long retryBackoffMs = 100;
     private MockTime time;
     private MockClient client;
-    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
-    private Node node = cluster.nodes().get(0);
+    private Node node;
     private Metadata metadata;
     private Metrics metrics;
     private ConsumerNetworkClient consumerClient;
@@ -92,16 +90,15 @@ public class WorkerCoordinatorTest {
         LogContext loggerFactory = new LogContext();
 
         this.time = new MockTime();
-        this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE, true);
-        this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+        this.node = metadata.fetch().nodes().get(0);
         this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, 100, 1000, heartbeatIntervalMs);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
 
-        client.setNode(node);
-
         this.coordinator = new WorkerCoordinator(
                 loggerFactory,
                 consumerClient,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 5b1e155..26c8e71 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.MockTime;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -49,8 +50,7 @@ public class TopicAdminTest {
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
@@ -63,8 +63,7 @@ public class TopicAdminTest {
     public void returnNullWithClusterAuthorizationFailure() {
         final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.nodes().iterator().next());
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
             env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
             boolean created = admin.createTopic(newTopic);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5e0f7c8..ae2ffb1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -439,8 +439,7 @@ object AdminClient {
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
-    val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
-    metadata.update(bootstrapCluster, Collections.emptySet(), 0)
+    metadata.bootstrap(brokerAddresses, time.milliseconds())
 
     val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
 
@@ -482,6 +481,6 @@ object AdminClient {
       requestTimeoutMs,
       retryBackoffMs,
       highLevelClient,
-      bootstrapCluster.nodes.asScala.toList)
+      metadata.fetch.nodes.asScala.toList)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index b7c037e..8c7e0dc 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -16,8 +16,12 @@
   */
 package kafka.server.epoch.util
 
+import java.util
+import java.util.Collections
+
 import kafka.cluster.BrokerEndPoint
 import kafka.server.BlockingSend
+import org.apache.kafka.clients.MockClient.MockMetadataUpdater
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.Records
@@ -35,7 +39,11 @@ import org.apache.kafka.common.{Node, TopicPartition}
   * setOffsetsForNextResponse
   */
 class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
-  private val client = new MockClient(new SystemTime)
+  private val client = new MockClient(new SystemTime, new MockMetadataUpdater {
+    override def fetchNodes(): util.List[Node] = Collections.emptyList()
+    override def isUpdateNeeded: Boolean = false
+    override def update(time: Time, update: MockClient.MetadataUpdate): Unit = {}
+  })
   var fetchCount = 0
   var epochFetchCount = 0
   var lastUsedOffsetForLeaderEpochVersion = -1
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index bb199b7..cc9a2ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -604,9 +605,11 @@ public abstract class AbstractResetIntegrationTest {
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
         // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
         if (intermediateUserTopic != null) {
-            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+                    Topic.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
         } else {
-            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
+            cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+                    Topic.GROUP_METADATA_TOPIC_NAME);
         }
     }
 }