You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/10/30 20:20:25 UTC
[kafka] branch trunk updated: KAFKA-7567;
Clean up internal metadata usage for consistency and extensibility
(#5813)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d71cb54 KAFKA-7567; Clean up internal metadata usage for consistency and extensibility (#5813)
d71cb54 is described below
commit d71cb54672e63d2f0f4b999668336642a9a63a1d
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 30 13:20:13 2018 -0700
KAFKA-7567; Clean up internal metadata usage for consistency and extensibility (#5813)
This patch makes two improvements to internal metadata handling logic and testing:
1. It reduce dependence on the public object `Cluster` for internal metadata propagation since it is not easy to evolve. As an example, we need to propagate leader epochs from the metadata response to `Metadata`, but it is not straightforward to do this without exposing it in `PartitionInfo` since that is what `Cluster` uses internally. By doing this change, we are able to remove some redundant `Cluster` building logic.
2. We want to make the metadata handling in `MockClient` simpler and more consistent. Currently we have mix of metadata update mechanisms which are internally inconsistent with each other and do not match the implementation in `NetworkClient`.
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../java/org/apache/kafka/clients/ClientUtils.java | 9 +-
.../java/org/apache/kafka/clients/Metadata.java | 90 +++---
.../org/apache/kafka/clients/NetworkClient.java | 7 +-
.../kafka/clients/admin/KafkaAdminClient.java | 37 ++-
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../main/java/org/apache/kafka/common/Cluster.java | 12 +-
.../org/apache/kafka/common/PartitionInfo.java | 10 +-
.../org/apache/kafka/common/internals/Topic.java | 2 +-
.../kafka/common/requests/MetadataResponse.java | 7 +
.../org/apache/kafka/clients/ClientUtilsTest.java | 4 +-
.../org/apache/kafka/clients/MetadataTest.java | 159 +++++------
.../java/org/apache/kafka/clients/MockClient.java | 149 ++++++----
.../apache/kafka/clients/NetworkClientTest.java | 10 +-
.../clients/admin/AdminClientUnitTestEnv.java | 55 ++--
.../kafka/clients/admin/KafkaAdminClientTest.java | 124 +++------
.../kafka/clients/consumer/KafkaConsumerTest.java | 303 +++++++++------------
.../internals/AbstractCoordinatorTest.java | 17 +-
.../internals/ConsumerCoordinatorTest.java | 90 +++---
.../internals/ConsumerNetworkClientTest.java | 4 +-
.../clients/consumer/internals/FetcherTest.java | 109 ++++----
.../kafka/clients/producer/KafkaProducerTest.java | 93 +++----
.../clients/producer/internals/SenderTest.java | 69 ++---
.../producer/internals/TransactionManagerTest.java | 16 +-
.../test/java/org/apache/kafka/test/TestUtils.java | 60 +++-
.../runtime/distributed/WorkerGroupMember.java | 3 +-
.../runtime/distributed/WorkerCoordinatorTest.java | 11 +-
.../apache/kafka/connect/util/TopicAdminTest.java | 7 +-
core/src/main/scala/kafka/admin/AdminClient.scala | 5 +-
.../util/ReplicaFetcherMockBlockingSend.scala | 10 +-
.../integration/AbstractResetIntegrationTest.java | 7 +-
31 files changed, 726 insertions(+), 757 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index fe83c5c..4d93324 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -45,9 +45,12 @@ public final class ClientUtils {
private ClientUtils() {
}
- public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookupConfig) {
+ return parseAndValidateAddresses(urls, ClientDnsLookup.forConfig(clientDnsLookupConfig));
+ }
+
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, ClientDnsLookup clientDnsLookup) {
List<InetSocketAddress> addresses = new ArrayList<>();
- ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
for (String url : urls) {
if (url != null && !url.isEmpty()) {
try {
@@ -56,7 +59,7 @@ public final class ClientUtils {
if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+ if (clientDnsLookup == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
for (InetAddress inetAddress : inetAddresses) {
String resolvedCanonicalName = inetAddress.getCanonicalHostName();
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 0abb5c4..1028de7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -18,15 +18,15 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -64,6 +64,7 @@ public class Metadata implements Closeable {
private long lastSuccessfulRefreshMs;
private AuthenticationException authenticationException;
private Cluster cluster;
+ private Set<String> unavailableTopics = Collections.emptySet();
private boolean needUpdate;
/* Topics with expiry time */
private final Map<String, Long> topics;
@@ -74,7 +75,9 @@ public class Metadata implements Closeable {
private final boolean topicExpiryEnabled;
private boolean isClosed;
- public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
+ public Metadata(long refreshBackoffMs,
+ long metadataExpireMs,
+ boolean allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
}
@@ -88,8 +91,11 @@ public class Metadata implements Closeable {
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
- public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
- boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
+ public Metadata(long refreshBackoffMs,
+ long metadataExpireMs,
+ boolean allowAutoTopicCreation,
+ boolean topicExpiryEnabled,
+ ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.allowAutoTopicCreation = allowAutoTopicCreation;
@@ -231,17 +237,23 @@ public class Metadata implements Closeable {
return this.topics.containsKey(topic);
}
+ public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
+ this.needUpdate = true;
+ this.lastRefreshMs = now;
+ this.lastSuccessfulRefreshMs = now;
+ this.version += 1;
+ this.cluster = Cluster.bootstrap(addresses);
+ }
+
/**
* Updates the cluster metadata. If topic expiry is enabled, expiry time
* is set for topics if required and expired topics are removed from the metadata.
*
- * @param newCluster the cluster containing metadata for topics with valid metadata
- * @param unavailableTopics topics which are non-existent or have one or more partitions whose
- * leader is not known
+ * @param metadataResponse metadata response received from the broker
* @param now current time in milliseconds
*/
- public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
- Objects.requireNonNull(newCluster, "cluster should not be null");
+ public synchronized void update(MetadataResponse metadataResponse, long now) {
+ Objects.requireNonNull(metadataResponse, "Metadata response cannot be null");
if (isClosed())
throw new IllegalStateException("Update requested after metadata close");
@@ -264,32 +276,34 @@ public class Metadata implements Closeable {
}
}
- for (Listener listener: listeners)
- listener.onMetadataUpdate(newCluster, unavailableTopics);
-
String previousClusterId = cluster.clusterResource().clusterId();
+ this.cluster = metadataResponse.cluster();
+ this.unavailableTopics = metadataResponse.unavailableTopics();
+
+ fireListeners(cluster, unavailableTopics);
+
if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
- this.cluster = getClusterForCurrentTopics(newCluster);
- } else {
- this.cluster = newCluster;
+ this.cluster = metadataResponse.cluster(topics.keySet());
}
- // The bootstrap cluster is guaranteed not to have any useful information
- if (!newCluster.isBootstrapConfigured()) {
- String newClusterId = newCluster.clusterResource().clusterId();
- if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
- log.info("Cluster ID: {}", newClusterId);
- clusterResourceListeners.onUpdate(newCluster.clusterResource());
- }
+ String newClusterId = cluster.clusterResource().clusterId();
+ if (newClusterId == null ? previousClusterId != null : !newClusterId.equals(previousClusterId))
+ log.info("Cluster ID: {}", newClusterId);
+ clusterResourceListeners.onUpdate(cluster.clusterResource());
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
+ private void fireListeners(Cluster newCluster, Set<String> unavailableTopics) {
+ for (Listener listener: listeners)
+ listener.onMetadataUpdate(newCluster, unavailableTopics);
+ }
+
/**
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
@@ -390,32 +404,4 @@ public class Metadata implements Closeable {
requestUpdate();
}
- private Cluster getClusterForCurrentTopics(Cluster cluster) {
- Set<String> unauthorizedTopics = new HashSet<>();
- Set<String> invalidTopics = new HashSet<>();
- Collection<PartitionInfo> partitionInfos = new ArrayList<>();
- List<Node> nodes = Collections.emptyList();
- Set<String> internalTopics = Collections.emptySet();
- Node controller = null;
- String clusterId = null;
- if (cluster != null) {
- clusterId = cluster.clusterResource().clusterId();
- internalTopics = cluster.internalTopics();
- unauthorizedTopics.addAll(cluster.unauthorizedTopics());
- unauthorizedTopics.retainAll(this.topics.keySet());
-
- invalidTopics.addAll(cluster.invalidTopics());
- invalidTopics.addAll(this.cluster.invalidTopics());
-
- for (String topic : this.topics.keySet()) {
- List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
- if (!partitionInfoList.isEmpty()) {
- partitionInfos.addAll(partitionInfoList);
- }
- }
- nodes = cluster.nodes();
- controller = cluster.controller();
- }
- return new Cluster(clusterId, nodes, partitionInfos, unauthorizedTopics, invalidTopics, internalTopics, controller);
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index c6f0c0b..902ef1c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -962,7 +962,6 @@ public class NetworkClient implements KafkaClient {
@Override
public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
this.metadataFetchInProgress = false;
- Cluster cluster = response.cluster();
// If any partition has leader with missing listeners, log a few for diagnosing broker configuration
// issues. This could be a transient issue if listeners were added dynamically to brokers.
@@ -984,11 +983,11 @@ public class NetworkClient implements KafkaClient {
// don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
- if (cluster.nodes().size() > 0) {
- this.metadata.update(cluster, response.unavailableTopics(), now);
- } else {
+ if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now, null);
+ } else {
+ this.metadata.update(response, now);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 33a4786..39817df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -339,6 +339,10 @@ public class KafkaAdminClient extends AdminClient {
AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
+ config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+ config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
+ metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId));
@@ -375,25 +379,25 @@ public class KafkaAdminClient extends AdminClient {
closeQuietly(networkClient, "NetworkClient");
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
- throw new KafkaException("Failed create new KafkaAdminClient", exc);
+ throw new KafkaException("Failed to create new KafkaAdminClient", exc);
}
}
- static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Time time) {
+ static KafkaAdminClient createInternal(AdminClientConfig config,
+ AdminMetadataManager metadataManager,
+ KafkaClient client,
+ Time time) {
Metrics metrics = null;
String clientId = generateClientId(config);
try {
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
LogContext logContext = createLogContext(clientId);
- AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
- config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
- config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
client, null, logContext);
} catch (Throwable exc) {
closeQuietly(metrics, "Metrics");
- throw new KafkaException("Failed create new KafkaAdminClient", exc);
+ throw new KafkaException("Failed to create new KafkaAdminClient", exc);
}
}
@@ -414,10 +418,6 @@ public class KafkaAdminClient extends AdminClient {
this.log = logContext.logger(KafkaAdminClient.class);
this.time = time;
this.metadataManager = metadataManager;
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
- config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
- config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
- metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
this.metrics = metrics;
this.client = client;
this.runnable = new AdminClientRunnable();
@@ -1399,12 +1399,12 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
- Cluster cluster = response.cluster();
Map<String, TopicListing> topicListing = new HashMap<>();
- for (String topicName : cluster.topics()) {
- boolean internal = cluster.internalTopics().contains(topicName);
- if (!internal || options.shouldListInternal())
- topicListing.put(topicName, new TopicListing(topicName, internal));
+ for (MetadataResponse.TopicMetadata topicMetadata : response.topicMetadata()) {
+ String topicName = topicMetadata.topic();
+ boolean isInternal = topicMetadata.isInternal();
+ if (!topicMetadata.isInternal() || options.shouldListInternal())
+ topicListing.put(topicName, new TopicListing(topicName, isInternal));
}
topicListingFuture.complete(topicListing);
}
@@ -2551,12 +2551,11 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
- Cluster cluster = metadataResponse.cluster();
-
- if (cluster.nodes().isEmpty())
+ Collection<Node> nodes = metadataResponse.brokers();
+ if (nodes.isEmpty())
throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
- HashSet<Node> allNodes = new HashSet<>(cluster.nodes());
+ HashSet<Node> allNodes = new HashSet<>(nodes);
final ListConsumerGroupsResults results = new ListConsumerGroupsResults(allNodes, all);
for (final Node node : allNodes) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 714cd94..065e663 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -712,7 +712,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+ this.metadata.bootstrap(addresses, time.milliseconds());
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e916771..c9052e3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -416,7 +416,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} else {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
- this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), time.milliseconds());
+ this.metadata.bootstrap(addresses, time.milliseconds());
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 24a18db..8a773bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -79,12 +79,12 @@ public final class Cluster {
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(String clusterId,
- Collection<Node> nodes,
- Collection<PartitionInfo> partitions,
- Set<String> unauthorizedTopics,
- Set<String> invalidTopics,
- Set<String> internalTopics,
- Node controller) {
+ Collection<Node> nodes,
+ Collection<PartitionInfo> partitions,
+ Set<String> unauthorizedTopics,
+ Set<String> invalidTopics,
+ Set<String> internalTopics,
+ Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 44cd4f4..7ed54f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -103,10 +103,12 @@ public class PartitionInfo {
/* Extract the node ids from each item in the array and format for display */
private String formatNodeIds(Node[] nodes) {
StringBuilder b = new StringBuilder("[");
- for (int i = 0; i < nodes.length; i++) {
- b.append(nodes[i].idString());
- if (i < nodes.length - 1)
- b.append(',');
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ b.append(nodes[i].idString());
+ if (i < nodes.length - 1)
+ b.append(',');
+ }
}
b.append("]");
return b.toString();
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index a5ef335..619477d 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,7 +28,7 @@ public class Topic {
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
- private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
+ public static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet(
Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME));
private static final int MAX_NAME_LENGTH = 249;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c78066f..fc67571 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -368,9 +368,16 @@ public class MetadataResponse extends AbstractResponse {
* @return the cluster snapshot
*/
public Cluster cluster() {
+ return cluster(null);
+ }
+
+ public Cluster cluster(Set<String> topicsToRetain) {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata) {
+ if (topicsToRetain != null && !topicsToRetain.contains(metadata.topic))
+ continue;
+
if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
internalTopics.add(metadata.topic);
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index 227afea..afe5a5d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -112,11 +112,11 @@ public class ClientUtilsTest {
}
private List<InetSocketAddress> checkWithoutLookup(String... url) {
- return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
+ return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT);
}
private List<InetSocketAddress> checkWithLookup(List<String> url) {
- return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
+ return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 969921e..b8c9d64 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -16,25 +16,26 @@
*/
package org.apache.kafka.clients;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.test.MockClusterResourceListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Test;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -54,10 +55,18 @@ public class MetadataTest {
assertNull("Exception in background thread : " + backgroundError.get(), backgroundError.get());
}
+ private static MetadataResponse emptyMetadataResponse() {
+ return new MetadataResponse(
+ Collections.emptyList(),
+ null,
+ -1,
+ Collections.emptyList());
+ }
+
@Test
public void testMetadata() throws Exception {
long time = 0;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -72,7 +81,8 @@ public class MetadataTest {
// This simulates the metadata update sequence in KafkaProducer
while (t1.isAlive() || t2.isAlive()) {
if (metadata.timeToNextUpdate(time) == 0) {
- metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time);
+ MetadataResponse response = TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1));
+ metadata.update(response, time);
time += refreshBackoffMs;
}
Thread.sleep(1);
@@ -87,7 +97,7 @@ public class MetadataTest {
@Test
public void testMetadataAwaitAfterClose() throws InterruptedException {
long time = 0;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
@@ -105,7 +115,7 @@ public class MetadataTest {
@Test(expected = IllegalStateException.class)
public void testMetadataUpdateAfterClose() {
metadata.close();
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+ metadata.update(emptyMetadataResponse(), 1000);
}
private static void checkTimeToNextUpdate(long refreshBackoffMs, long metadataExpireMs) {
@@ -126,7 +136,7 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now));
// lastSuccessfulRefreshMs updated to now.
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
// The last update was successful so the remaining time to expire the current metadata should be returned.
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
@@ -137,7 +147,7 @@ public class MetadataTest {
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
// Reset needUpdate to false.
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now));
// Both metadataExpireMs and refreshBackoffMs elapsed.
@@ -181,13 +191,13 @@ public class MetadataTest {
long now = 10000;
// New topic added to fetch set and update requested. It should allow immediate update.
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
metadata.add("new-topic");
assertEquals(0, metadata.timeToNextUpdate(now));
// Even though setTopics called, immediate update isn't necessary if the new topic set isn't
// containing a new topic,
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
metadata.setTopics(metadata.topics());
assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
@@ -196,12 +206,12 @@ public class MetadataTest {
assertEquals(0, metadata.timeToNextUpdate(now));
// If metadata requested for all topics it should allow immediate update.
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
metadata.needMetadataForAllTopics(true);
assertEquals(0, metadata.timeToNextUpdate(now));
// However if metadata is already capable to serve all topics it shouldn't override backoff.
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), now);
+ metadata.update(emptyMetadataResponse(), now);
metadata.needMetadataForAllTopics(true);
assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now));
}
@@ -216,7 +226,7 @@ public class MetadataTest {
@Test
public void testMetadataUpdateWaitTime() throws Exception {
long time = 0;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
// first try with a max wait time of 0 and ensure that this returns back without waiting forever
try {
@@ -238,7 +248,7 @@ public class MetadataTest {
@Test
public void testFailedUpdate() {
long time = 100;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
metadata.failedUpdate(1100, null);
@@ -247,26 +257,24 @@ public class MetadataTest {
assertEquals(100, metadata.lastSuccessfulUpdate());
metadata.needMetadataForAllTopics(true);
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
}
@Test
public void testUpdateWithNeedMetadataForAllTopics() {
long time = 0;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
metadata.needMetadataForAllTopics(true);
final List<String> expectedTopics = Collections.singletonList("topic");
metadata.setTopics(expectedTopics);
- metadata.update(new Cluster(null,
- Collections.singletonList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null)),
- Collections.<String>emptySet(),
- Collections.<String>emptySet()),
- Collections.<String>emptySet(), 100);
+
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put("topic", 1);
+ partitionCounts.put("topic1", 1);
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, partitionCounts);
+ metadata.update(metadataResponse, 100);
assertArrayEquals("Metadata got updated with wrong set of topics.",
expectedTopics.toArray(), metadata.topics().toArray());
@@ -283,20 +291,15 @@ public class MetadataTest {
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, false, listeners);
String hostName = "www.example.com";
- Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
- metadata.update(cluster, Collections.<String>emptySet(), time);
+ metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
- metadata.update(new Cluster(
- "dummy",
- Arrays.asList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null)),
- Collections.<String>emptySet(),
- Collections.<String>emptySet()),
- Collections.<String>emptySet(), 100);
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put("topic", 1);
+ partitionCounts.put("topic1", 1);
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+ metadata.update(metadataResponse, 100);
assertEquals("MockClusterResourceListener did not get cluster metadata correctly",
"dummy", mockClusterListener.clusterResource().clusterId());
@@ -308,7 +311,7 @@ public class MetadataTest {
public void testListenerGetsNotifiedOfUpdate() {
long time = 0;
final Set<String> topics = new HashSet<>();
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -317,15 +320,11 @@ public class MetadataTest {
}
});
- metadata.update(new Cluster(
- null,
- Arrays.asList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null)),
- Collections.<String>emptySet(),
- Collections.<String>emptySet()),
- Collections.<String>emptySet(), 100);
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put("topic", 1);
+ partitionCounts.put("topic1", 1);
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+ metadata.update(metadataResponse, 100);
assertEquals("Listener did not update topics list correctly",
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -335,7 +334,7 @@ public class MetadataTest {
public void testListenerCanUnregister() {
long time = 0;
final Set<String> topics = new HashSet<>();
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
final Metadata.Listener listener = new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
@@ -345,27 +344,19 @@ public class MetadataTest {
};
metadata.addListener(listener);
- metadata.update(new Cluster(
- "cluster",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo("topic", 0, null, null, null),
- new PartitionInfo("topic1", 0, null, null, null)),
- Collections.<String>emptySet(),
- Collections.<String>emptySet()),
- Collections.<String>emptySet(), 100);
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put("topic", 1);
+ partitionCounts.put("topic1", 1);
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+ metadata.update(metadataResponse, 100);
metadata.removeListener(listener);
- metadata.update(new Cluster(
- "cluster",
- Arrays.asList(new Node(0, "host1", 1000)),
- Arrays.asList(
- new PartitionInfo("topic2", 0, null, null, null),
- new PartitionInfo("topic3", 0, null, null, null)),
- Collections.<String>emptySet(),
- Collections.<String>emptySet()),
- Collections.<String>emptySet(), 100);
+ partitionCounts.clear();
+ partitionCounts.put("topic2", 1);
+ partitionCounts.put("topic3", 1);
+ metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, partitionCounts);
+ metadata.update(metadataResponse, 100);
assertEquals("Listener did not update topics list correctly",
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
@@ -378,17 +369,17 @@ public class MetadataTest {
// Test that topic is expired if not used within the expiry interval
long time = 0;
metadata.add("topic1");
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
time += Metadata.TOPIC_EXPIRY_MS;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
// Test that topic is not expired if used within the expiry interval
metadata.add("topic2");
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
for (int i = 0; i < 3; i++) {
time += Metadata.TOPIC_EXPIRY_MS / 2;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
metadata.add("topic2");
}
@@ -397,9 +388,9 @@ public class MetadataTest {
HashSet<String> topics = new HashSet<>();
topics.add("topic4");
metadata.setTopics(topics);
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
time += Metadata.TOPIC_EXPIRY_MS;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
}
@@ -410,17 +401,17 @@ public class MetadataTest {
// Test that topic is not expired if not used within the expiry interval
long time = 0;
metadata.add("topic1");
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
time += Metadata.TOPIC_EXPIRY_MS;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
// Test that topic is not expired if used within the expiry interval
metadata.add("topic2");
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
for (int i = 0; i < 3; i++) {
time += Metadata.TOPIC_EXPIRY_MS / 2;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
metadata.add("topic2");
}
@@ -430,7 +421,7 @@ public class MetadataTest {
topics.add("topic4");
metadata.setTopics(topics);
time += metadataExpireMs * 2;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ metadata.update(emptyMetadataResponse(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a586af8..0cfb3be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
@@ -24,12 +23,12 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -38,6 +37,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
/**
* A mock network client for use testing code
@@ -73,10 +73,7 @@ public class MockClient implements KafkaClient {
private int correlation;
private final Time time;
- private final Metadata metadata;
- private Set<String> unavailableTopics;
- private Cluster cluster;
- private Node node = null;
+ private final MockMetadataUpdater metadataUpdater;
private final Set<String> ready = new HashSet<>();
// Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
@@ -97,14 +94,13 @@ public class MockClient implements KafkaClient {
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
private volatile int numBlockingWakeups = 0;
- public MockClient(Time time) {
- this(time, null);
+ public MockClient(Time time, Metadata metadata) {
+ this(time, new DefaultMockMetadataUpdater(metadata));
}
- public MockClient(Time time, Metadata metadata) {
+ public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
this.time = time;
- this.metadata = metadata;
- this.unavailableTopics = Collections.emptySet();
+ this.metadataUpdater = metadataUpdater;
this.blackedOut = new TransientSet<>(time);
this.unreachable = new TransientSet<>(time);
this.delayedReady = new TransientSet<>(time);
@@ -280,21 +276,13 @@ public class MockClient implements KafkaClient {
checkTimeoutOfPendingRequests(now);
List<ClientResponse> copy = new ArrayList<>(this.responses);
- if (metadata != null && metadata.updateRequested()) {
+ // We skip metadata updates if all nodes are currently blacked out
+ if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
- if (cluster != null)
- metadata.update(cluster, this.unavailableTopics, time.milliseconds());
- if (metadataUpdate == null)
- metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds());
- else {
- if (metadataUpdate.expectMatchRefreshTopics
- && !metadata.topics().equals(metadataUpdate.cluster.topics())) {
- throw new IllegalStateException("The metadata topics does not match expectation. "
- + "Expected topics: " + metadataUpdate.cluster.topics()
- + ", asked topics: " + metadata.topics());
- }
- this.unavailableTopics = metadataUpdate.unavailableTopics;
- metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds());
+ if (metadataUpdate != null) {
+ metadataUpdater.update(time, metadataUpdate);
+ } else {
+ metadataUpdater.updateWithCurrentMetadata(time);
}
}
@@ -310,6 +298,7 @@ public class MockClient implements KafkaClient {
return Math.max(0, currentTimeMs - startTimeMs);
}
+
private void checkTimeoutOfPendingRequests(long nowMs) {
ClientRequest request = requests.peek();
while (request != null && elapsedTimeMs(nowMs, request.createdTimeMs()) > request.requestTimeoutMs()) {
@@ -350,9 +339,9 @@ public class MockClient implements KafkaClient {
public void respond(AbstractResponse response, boolean disconnected) {
- ClientRequest request = null;
- if (requests.size() > 0)
- request = requests.remove();
+ if (requests.isEmpty())
+ throw new IllegalStateException("No requests pending for inbound response " + response);
+ ClientRequest request = requests.poll();
short version = request.requestBuilder().latestAllowedVersion();
responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response));
@@ -463,22 +452,17 @@ public class MockClient implements KafkaClient {
return futureResponses.size();
}
- public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
- metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
+ public void prepareMetadataUpdate(MetadataResponse updateResponse) {
+ prepareMetadataUpdate(updateResponse, false);
}
- public void prepareMetadataUpdate(Cluster cluster,
- Set<String> unavailableTopics,
+ public void prepareMetadataUpdate(MetadataResponse updateResponse,
boolean expectMatchMetadataTopics) {
- metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, expectMatchMetadataTopics));
- }
-
- public void setNode(Node node) {
- this.node = node;
+ metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics));
}
- public void cluster(Cluster cluster) {
- this.cluster = cluster;
+ public void updateMetadata(MetadataResponse updateResponse) {
+ metadataUpdater.update(time, new MetadataUpdate(updateResponse, false));
}
@Override
@@ -534,7 +518,7 @@ public class MockClient implements KafkaClient {
@Override
public void close() {
- metadata.close();
+ metadataUpdater.close();
}
@Override
@@ -545,9 +529,11 @@ public class MockClient implements KafkaClient {
@Override
public Node leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
- if (blackedOut.contains(node, now))
- return null;
- return this.node;
+ for (Node node : metadataUpdater.fetchNodes()) {
+ if (!blackedOut.contains(node, now))
+ return node;
+ }
+ return null;
}
/**
@@ -564,15 +550,20 @@ public class MockClient implements KafkaClient {
this.nodeApiVersions = nodeApiVersions;
}
- private static class MetadataUpdate {
- final Cluster cluster;
- final Set<String> unavailableTopics;
+ public static class MetadataUpdate {
+ final MetadataResponse updateResponse;
final boolean expectMatchRefreshTopics;
- MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean expectMatchRefreshTopics) {
- this.cluster = cluster;
- this.unavailableTopics = unavailableTopics;
+
+ MetadataUpdate(MetadataResponse updateResponse, boolean expectMatchRefreshTopics) {
+ this.updateResponse = updateResponse;
this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
+
+ private Set<String> topics() {
+ return updateResponse.topicMetadata().stream()
+ .map(MetadataResponse.TopicMetadata::topic)
+ .collect(Collectors.toSet());
+ }
}
private static class TransientSet<T> {
@@ -614,4 +605,64 @@ public class MockClient implements KafkaClient {
}
+ /**
+ * This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
+ * metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
+ * and bookkeeping through {@link Metadata}. The extensibility allows AdminClient, which does
+ * not rely on {@link Metadata} to do its own thing.
+ */
+ public interface MockMetadataUpdater {
+ List<Node> fetchNodes();
+
+ boolean isUpdateNeeded();
+
+ void update(Time time, MetadataUpdate update);
+
+ default void updateWithCurrentMetadata(Time time) {}
+
+ default void close() {}
+ }
+
+ private static class DefaultMockMetadataUpdater implements MockMetadataUpdater {
+ private final Metadata metadata;
+ private MetadataUpdate lastUpdate;
+
+ public DefaultMockMetadataUpdater(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public List<Node> fetchNodes() {
+ return metadata.fetch().nodes();
+ }
+
+ @Override
+ public boolean isUpdateNeeded() {
+ return metadata.updateRequested();
+ }
+
+ @Override
+ public void updateWithCurrentMetadata(Time time) {
+ if (lastUpdate == null)
+ throw new IllegalStateException("No previous metadata update to use");
+ update(time, lastUpdate);
+ }
+
+ @Override
+ public void update(Time time, MetadataUpdate update) {
+ if (update.expectMatchRefreshTopics && !metadata.topics().equals(update.topics())) {
+ throw new IllegalStateException("The metadata topics does not match expectation. "
+ + "Expected topics: " + update.topics()
+ + ", asked topics: " + metadata.topics());
+ }
+ metadata.update(update.updateResponse, time.milliseconds());
+ this.lastUpdate = update;
+ }
+
+ @Override
+ public void close() {
+ metadata.close();
+ }
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8abe9a40..5a4e1e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.NetworkReceive;
@@ -27,6 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
@@ -56,9 +56,9 @@ public class NetworkClientTest {
protected final MockTime time = new MockTime();
protected final MockSelector selector = new MockSelector(time);
protected final Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
- protected final int nodeId = 1;
- protected final Cluster cluster = TestUtils.singletonCluster("test", nodeId);
- protected final Node node = cluster.nodes().get(0);
+ protected final MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+ Collections.singletonMap("test", 1));
+ protected final Node node = initialMetadataResponse.brokers().iterator().next();
protected final long reconnectBackoffMsTest = 10 * 1000;
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
@@ -89,7 +89,7 @@ public class NetworkClientTest {
@Before
public void setup() {
selector.reset();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ metadata.update(initialMetadataResponse, time.milliseconds());
}
@Test(expected = IllegalStateException.class)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index e77b48b..89f5fde 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -17,12 +17,14 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -42,38 +44,45 @@ public class AdminClientUnitTestEnv implements AutoCloseable {
private final MockClient mockClient;
private final KafkaAdminClient adminClient;
- public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
+ public AdminClientUnitTestEnv(Cluster cluster, String... vals) {
this(Time.SYSTEM, cluster, vals);
}
- public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
+ public AdminClientUnitTestEnv(Time time, Cluster cluster, String... vals) {
this(time, cluster, newStrMap(vals));
}
- public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
- this(newMockClient(time, cluster), time, cluster, config);
- }
-
- public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster) {
- this(mockClient, time, cluster, newStrMap());
- }
-
- private static MockClient newMockClient(Time time, Cluster cluster) {
- MockClient mockClient = new MockClient(time);
- mockClient.prepareResponse(new MetadataResponse(cluster.nodes(),
- cluster.clusterResource().clusterId(),
- cluster.controller() == null ? MetadataResponse.NO_CONTROLLER_ID : cluster.controller().id(),
- Collections.emptyList()));
- return mockClient;
+ public AdminClientUnitTestEnv(Time time, Cluster cluster) {
+ this(time, cluster, newStrMap());
}
- public AdminClientUnitTestEnv(MockClient mockClient, Time time, Cluster cluster,
- Map<String, Object> config) {
+ public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) {
this.time = time;
this.cluster = cluster;
AdminClientConfig adminClientConfig = new AdminClientConfig(config);
- this.mockClient = mockClient;
- this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, time);
+
+ AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
+ adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+ adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
+ this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
+ @Override
+ public List<Node> fetchNodes() {
+ return cluster.nodes();
+ }
+
+ @Override
+ public boolean isUpdateNeeded() {
+ return false;
+ }
+
+ @Override
+ public void update(Time time, MockClient.MetadataUpdate update) {
+ throw new UnsupportedOperationException();
+ }
+ });
+
+ metadataManager.update(cluster, time.milliseconds());
+ this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, metadataManager, mockClient, time);
}
public Time time() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dc55ea2..1f62d39 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.clients.ClientDnsLookup;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
@@ -191,6 +193,11 @@ public class KafkaAdminClientTest {
Collections.emptySet(), nodes.get(controllerIndex));
}
+ private static Cluster mockBootstrapCluster() {
+ return Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(
+ Collections.singletonList("localhost:8121"), ClientDnsLookup.DEFAULT));
+ }
+
private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
return new AdminClientUnitTestEnv(mockCluster(0), configVals);
}
@@ -206,15 +213,9 @@ public class KafkaAdminClientTest {
*/
@Test
public void testTimeoutWithoutMetadata() throws Exception {
- Cluster cluster = mockCluster(0);
- MockClient mockClient = new MockClient(Time.SYSTEM);
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
- Time.SYSTEM,
- cluster,
- newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
- AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, mockBootstrapCluster(),
+ newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(new Node(0, "localhost", 8121));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -228,16 +229,13 @@ public class KafkaAdminClientTest {
// This tests the scenario in which we successfully connect to the bootstrap server, but
// the server disconnects before sending the full response
- Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
- MockClient mockClient = new MockClient(Time.SYSTEM);
- mockClient.setNodeApiVersions(NodeApiVersions.create());
- mockClient.setNode(cluster.nodes().get(0));
-
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+ Cluster cluster = mockBootstrapCluster();
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
Cluster discoveredCluster = mockCluster(0);
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
- env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
- new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
+ env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
+ new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
@@ -256,13 +254,10 @@ public class KafkaAdminClientTest {
// which prevents AdminClient from being able to send the initial metadata request
Cluster cluster = Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 8121)));
- MockClient mockClient = new MockClient(Time.SYSTEM);
- mockClient.setNodeApiVersions(NodeApiVersions.create());
- mockClient.setNode(cluster.nodes().get(0));
- mockClient.setUnreachable(cluster.nodes().get(0), 200);
-
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, cluster)) {
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster)) {
Cluster discoveredCluster = mockCluster(0);
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
@@ -283,16 +278,12 @@ public class KafkaAdminClientTest {
@Test
public void testPropagatedMetadataFetchException() throws Exception {
Cluster cluster = mockCluster(0);
- MockClient mockClient = new MockClient(Time.SYSTEM);
- mockClient.createPendingAuthenticationError(cluster.nodeById(0),
- TimeUnit.DAYS.toMillis(1));
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
- Time.SYSTEM,
- cluster,
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().nodeById(0));
+ env.kafkaClient().createPendingAuthenticationError(cluster.nodeById(0),
+ TimeUnit.DAYS.toMillis(1));
env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -305,7 +296,6 @@ public class KafkaAdminClientTest {
public void testCreateTopics() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
@@ -319,30 +309,26 @@ public class KafkaAdminClientTest {
public void testCreateTopicsRetryBackoff() throws Exception {
Cluster cluster = mockCluster(0);
MockTime time = new MockTime();
- MockClient mockClient = new MockClient(time);
int retryBackoff = 100;
- mockClient.prepareResponse(body -> body instanceof MetadataRequest,
- new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(),
- 1, Collections.emptyList()));
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+ newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
+ MockClient mockClient = env.kafkaClient();
- AtomicLong firstAttemptTime = new AtomicLong(0);
- AtomicLong secondAttemptTime = new AtomicLong(0);
+ mockClient.setNodeApiVersions(NodeApiVersions.create());
- mockClient.prepareResponse(body -> {
- firstAttemptTime.set(time.milliseconds());
- return body instanceof CreateTopicsRequest;
- }, null, true);
+ AtomicLong firstAttemptTime = new AtomicLong(0);
+ AtomicLong secondAttemptTime = new AtomicLong(0);
- mockClient.prepareResponse(body -> {
- secondAttemptTime.set(time.milliseconds());
- return body instanceof CreateTopicsRequest;
- }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+ mockClient.prepareResponse(body -> {
+ firstAttemptTime.set(time.milliseconds());
+ return body instanceof CreateTopicsRequest;
+ }, null, true);
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, time, cluster,
- newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
- mockClient.setNodeApiVersions(NodeApiVersions.create());
- mockClient.setNode(env.cluster().controller());
+ mockClient.prepareResponse(body -> {
+ secondAttemptTime.set(time.milliseconds());
+ return body instanceof CreateTopicsRequest;
+ }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
KafkaFuture<Void> future = env.adminClient().createTopics(
Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
@@ -354,18 +340,17 @@ public class KafkaAdminClientTest {
time.sleep(retryBackoff);
future.get();
- }
- long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
- assertEquals("CreateTopics retry did not await expected backoff",
- retryBackoff, actualRetryBackoff);
+ long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
+ assertEquals("CreateTopics retry did not await expected backoff",
+ retryBackoff, actualRetryBackoff);
+ }
}
@Test
public void testCreateTopicsHandleNotControllerException() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().nodeById(0));
env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
env.cluster().nodeById(0));
@@ -387,7 +372,6 @@ public class KafkaAdminClientTest {
public void testDeleteTopics() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest,
new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE)));
@@ -413,7 +397,6 @@ public class KafkaAdminClientTest {
public void testInvalidTopicNames() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
List<String> sillyTopicNames = asList("", null);
Map<String, KafkaFuture<Void>> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).values();
@@ -447,12 +430,10 @@ public class KafkaAdminClientTest {
// We should continue retrying on metadata update failures in spite of retry configuration
String topic = "topic";
- MockClient mockClient = new MockClient(Time.SYSTEM);
Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
Cluster initializedCluster = mockCluster(0);
- mockClient.setNode(bootstrapCluster.nodes().get(0));
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, Time.SYSTEM, bootstrapCluster,
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) {
@@ -484,17 +465,12 @@ public class KafkaAdminClientTest {
@Test
public void testAdminClientApisAuthenticationFailure() throws Exception {
- Cluster cluster = mockCluster(0);
- MockClient mockClient = new MockClient(Time.SYSTEM);
- mockClient.createPendingAuthenticationError(cluster.nodeById(0),
- TimeUnit.DAYS.toMillis(1));
- try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient,
- Time.SYSTEM,
- cluster,
- newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121",
- AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
+ Cluster cluster = mockBootstrapCluster();
+ try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster,
+ newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
+ env.kafkaClient().createPendingAuthenticationError(cluster.nodes().get(0),
+ TimeUnit.DAYS.toMillis(1));
callAdminClientApisAndExpectAnAuthenticationError(env);
}
}
@@ -570,7 +546,6 @@ public class KafkaAdminClientTest {
public void testDescribeAcls() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
// Test a call where we get back ACL1 and ACL2.
env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE,
@@ -597,7 +572,6 @@ public class KafkaAdminClientTest {
public void testCreateAcls() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
// Test a call where we successfully create two ACLs.
env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
@@ -625,7 +599,6 @@ public class KafkaAdminClientTest {
public void testDeleteAcls() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
// Test a call where one filter has an error.
env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
@@ -677,7 +650,6 @@ public class KafkaAdminClientTest {
AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(nodes.get(0));
assertEquals(time, env.time());
assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
@@ -718,7 +690,6 @@ public class KafkaAdminClientTest {
public void testDescribeConfigs() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, "0"),
new DescribeConfigsResponse.Config(ApiError.NONE, Collections.emptySet()))));
@@ -732,7 +703,6 @@ public class KafkaAdminClientTest {
public void testCreatePartitions() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
Map<String, ApiError> m = new HashMap<>();
m.put("my_topic", ApiError.NONE);
@@ -784,7 +754,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().nodes().get(0));
Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> m = new HashMap<>();
m.put(myTopicPartition0,
@@ -889,7 +858,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
@@ -991,7 +959,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
@@ -1022,7 +989,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@@ -1085,7 +1051,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@@ -1129,7 +1094,6 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
- env.kafkaClient().setNode(env.cluster().controller());
//Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4ac5876..89fca84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
@@ -59,6 +58,7 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -361,14 +361,12 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSent() throws Exception {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -396,14 +394,11 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -430,16 +425,14 @@ public class KafkaConsumerTest {
}
@Test
- public void verifyPollTimesOutDuringMetadataUpdate() throws Exception {
+ public void verifyPollTimesOutDuringMetadataUpdate() {
final Time time = new MockTime();
- final Cluster cluster = TestUtils.singletonCluster(topic, 1);
- final Node node = cluster.nodes().get(0);
+ Metadata metadata = createMetadata();
+ final MockClient client = new MockClient(time, metadata);
- final Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
- final MockClient client = new MockClient(time, metadata);
- client.setNode(node);
final PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -454,16 +447,14 @@ public class KafkaConsumerTest {
}
@Test
- public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exception {
+ public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() {
final Time time = new MockTime();
- final Cluster cluster = TestUtils.singletonCluster(topic, 1);
- final Node node = cluster.nodes().get(0);
+ Metadata metadata = createMetadata();
+ final MockClient client = new MockClient(time, metadata);
- final Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
- final MockClient client = new MockClient(time, metadata);
- client.setNode(node);
final PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -483,14 +474,10 @@ public class KafkaConsumerTest {
@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -514,14 +501,9 @@ public class KafkaConsumerTest {
// a reset on another partition.
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 2);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+ initMetadata(client, Collections.singletonMap(topic, 2));
KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, metadata);
consumer.assign(Arrays.asList(tp0, tp1));
@@ -555,17 +537,20 @@ public class KafkaConsumerTest {
assertEquals(singleton(tp0), records.partitions());
}
+ private void initMetadata(MockClient mockClient, Map<String, Integer> partitionCounts) {
+ MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, partitionCounts);
+ mockClient.updateMetadata(initialMetadata);
+ }
+
@Test(expected = NoOffsetForPartitionException.class)
public void testMissingOffsetNoResetPolicy() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -583,14 +568,12 @@ public class KafkaConsumerTest {
@Test
public void testResetToCommittedOffset() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -609,14 +592,12 @@ public class KafkaConsumerTest {
@Test
public void testResetUsingAutoResetPolicy() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
@@ -640,14 +621,12 @@ public class KafkaConsumerTest {
long offset2 = 20000;
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 2);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -679,14 +658,12 @@ public class KafkaConsumerTest {
@Test
public void testAutoCommitSentBeforePositionUpdate() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -717,17 +694,15 @@ public class KafkaConsumerTest {
public void testRegexSubscription() {
String unmatchedTopic = "unmatched";
Time time = new MockTime();
-
- Map<String, Integer> topicMetadata = new HashMap<>();
- topicMetadata.put(topic, 1);
- topicMetadata.put(unmatchedTopic, 1);
-
- Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
Metadata metadata = createMetadata();
- Node node = cluster.nodes().get(0);
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put(topic, 1);
+ partitionCounts.put(unmatchedTopic, 1);
+ initMetadata(client, partitionCounts);
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -735,7 +710,7 @@ public class KafkaConsumerTest {
consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
@@ -752,20 +727,14 @@ public class KafkaConsumerTest {
TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0);
Time time = new MockTime();
-
- Map<String, Integer> topicMetadata = new HashMap<>();
- topicMetadata.put(topic, 1);
- topicMetadata.put(otherTopic, 1);
-
- Cluster cluster = TestUtils.clusterWith(1, topicMetadata);
Metadata metadata = createMetadata();
- Node node = cluster.nodes().get(0);
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
- client.cluster(cluster);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ Map<String, Integer> partitionCounts = new HashMap<>();
+ partitionCounts.put(topic, 1);
+ partitionCounts.put(otherTopic, 1);
+ initMetadata(client, partitionCounts);
+ Node node = metadata.fetch().nodes().get(0);
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -779,6 +748,7 @@ public class KafkaConsumerTest {
consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer));
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, partitionCounts));
prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator);
consumer.poll(Duration.ZERO);
@@ -789,14 +759,12 @@ public class KafkaConsumerTest {
@Test
public void testWakeupWithFetchDataAvailable() throws Exception {
final Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -838,16 +806,14 @@ public class KafkaConsumerTest {
}
@Test
- public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
+ public void testPollThrowsInterruptExceptionIfInterrupted() {
final Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- final Node node = cluster.nodes().get(0);
+ final Metadata metadata = createMetadata();
+ final MockClient client = new MockClient(time, metadata);
- Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
- final MockClient client = new MockClient(time, metadata);
- client.setNode(node);
final PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -872,14 +838,12 @@ public class KafkaConsumerTest {
@Test
public void fetchResponseWithUnexpectedPartitionIsIgnored() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(singletonMap(topic, 1));
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -909,18 +873,16 @@ public class KafkaConsumerTest {
@Test
public void testSubscriptionChangesWithAutoCommitEnabled() {
Time time = new MockTime();
+ Metadata metadata = createMetadata();
+ MockClient client = new MockClient(time, metadata);
+
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
tpCounts.put(topic3, 1);
- Cluster cluster = TestUtils.singletonCluster(tpCounts);
- Node node = cluster.nodes().get(0);
+ initMetadata(client, tpCounts);
+ Node node = metadata.fetch().nodes().get(0);
- Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
- MockClient client = new MockClient(time, metadata);
- client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1025,17 +987,15 @@ public class KafkaConsumerTest {
@Test
public void testSubscriptionChangesWithAutoCommitDisabled() {
Time time = new MockTime();
+ Metadata metadata = createMetadata();
+ MockClient client = new MockClient(time, metadata);
+
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
- Cluster cluster = TestUtils.singletonCluster(tpCounts);
- Node node = cluster.nodes().get(0);
+ initMetadata(client, tpCounts);
+ Node node = metadata.fetch().nodes().get(0);
- Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
- MockClient client = new MockClient(time, metadata);
- client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1088,17 +1048,15 @@ public class KafkaConsumerTest {
@Test
public void testManualAssignmentChangeWithAutoCommitEnabled() {
Time time = new MockTime();
+ Metadata metadata = createMetadata();
+ MockClient client = new MockClient(time, metadata);
+
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
- Cluster cluster = TestUtils.singletonCluster(tpCounts);
- Node node = cluster.nodes().get(0);
-
- Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, tpCounts);
+ Node node = metadata.fetch().nodes().get(0);
- MockClient client = new MockClient(time, metadata);
- client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1145,17 +1103,15 @@ public class KafkaConsumerTest {
@Test
public void testManualAssignmentChangeWithAutoCommitDisabled() {
Time time = new MockTime();
+ Metadata metadata = createMetadata();
+ MockClient client = new MockClient(time, metadata);
+
Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
- Cluster cluster = TestUtils.singletonCluster(tpCounts);
- Node node = cluster.nodes().get(0);
-
- Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, tpCounts);
+ Node node = metadata.fetch().nodes().get(0);
- MockClient client = new MockClient(time, metadata);
- client.setNode(node);
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1203,14 +1159,12 @@ public class KafkaConsumerTest {
@Test
public void testOffsetOfPausedPartitions() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 2);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 2));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RangeAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
@@ -1334,14 +1288,12 @@ public class KafkaConsumerTest {
@Test
public void shouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
+ Metadata metadata = createMetadata();
+ MockClient client = new MockClient(time, metadata);
- Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
- MockClient client = new MockClient(time, metadata);
- client.setNode(node);
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
@@ -1404,21 +1356,19 @@ public class KafkaConsumerTest {
long waitMs,
boolean interrupt) throws Exception {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, Collections.singletonMap(topic, 1)));
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
@@ -1542,16 +1492,12 @@ public class KafkaConsumerTest {
private KafkaConsumer<String, String> consumerWithPendingAuthentication() {
Time time = new MockTime();
- Map<String, Integer> tpCounts = new HashMap<>();
- tpCounts.put(topic, 1);
- Cluster cluster = TestUtils.singletonCluster(tpCounts);
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
PartitionAssignor assignor = new RangeAssignor();
client.createPendingAuthenticationError(node, 0);
@@ -1861,29 +1807,24 @@ public class KafkaConsumerTest {
@Test(expected = InvalidTopicException.class)
public void testSubscriptionOnInvalidTopic() {
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster();
- Node node = cluster.nodes().get(0);
-
Metadata metadata = createMetadata();
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Cluster cluster = metadata.fetch();
+
PartitionAssignor assignor = new RoundRobinAssignor();
String invalidTopicName = "topic abc"; // Invalid topic name due to space
- Set<String> invalidTopic = new HashSet<String>();
- invalidTopic.add(invalidTopicName);
- Cluster metadataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
- cluster.nodes(),
- new ArrayList<PartitionInfo>(0),
- Collections.<String>emptySet(),
- invalidTopic,
- cluster.internalTopics(),
- cluster.controller());
- client.prepareMetadataUpdate(metadataUpdateResponseCluster, Collections.<String>emptySet());
-
+ List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+ topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+ invalidTopicName, false, Collections.emptyList()));
+ MetadataResponse updateResponse = new MetadataResponse(cluster.nodes(),
+ cluster.clusterResource().clusterId(),
+ cluster.controller().id(),
+ topicMetadata);
+ client.prepareMetadataUpdate(updateResponse);
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true);
consumer.subscribe(singleton(invalidTopicName), getConsumerRebalanceListener(consumer));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 004445f..a0e7ab0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
@@ -52,6 +51,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
@@ -87,18 +87,15 @@ public class AbstractCoordinatorTest {
private void setupCoordinator(int retryBackoffMs, int rebalanceTimeoutMs) {
this.mockTime = new MockTime();
- this.mockClient = new MockClient(mockTime);
-
Metadata metadata = new Metadata(retryBackoffMs, 60 * 60 * 1000L, true);
+
+ this.mockClient = new MockClient(mockTime, metadata);
this.consumerClient = new ConsumerNetworkClient(new LogContext(), mockClient, metadata, mockTime,
retryBackoffMs, REQUEST_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS);
Metrics metrics = new Metrics();
- Cluster cluster = TestUtils.singletonCluster("topic", 1);
- metadata.update(cluster, Collections.emptySet(), mockTime.milliseconds());
- this.node = cluster.nodes().get(0);
- mockClient.setNode(node);
-
+ mockClient.updateMetadata(TestUtils.metadataUpdateWith(1, emptyMap()));
+ this.node = metadata.fetch().nodes().get(0);
this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime, rebalanceTimeoutMs, retryBackoffMs);
}
@@ -244,11 +241,11 @@ public class AbstractCoordinatorTest {
public void testLookupCoordinator() {
setupCoordinator();
- mockClient.setNode(null);
+ mockClient.blackout(node, 50);
RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator();
assertTrue("Failed future expected", noBrokersAvailableFuture.failed());
+ mockTime.sleep(50);
- mockClient.setNode(node);
RequestFuture<Void> future = coordinator.lookupCoordinator();
assertFalse("Request not sent", future.isDone());
assertSame("New request sent while one is in progress", future, coordinator.lookupCoordinator());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index af073f1..72808c8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -40,6 +39,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -114,13 +115,13 @@ public class ConsumerCoordinatorTest {
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
private List<PartitionAssignor> assignors = Collections.singletonList(partitionAssignor);
private MockClient client;
- private Cluster cluster = TestUtils.clusterWith(1, new HashMap<String, Integer>() {
+ private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
put(topic1, 1);
put(topic2, 1);
}
});
- private Node node = cluster.nodes().get(0);
+ private Node node = metadataResponse.brokers().iterator().next();
private SubscriptionState subscriptions;
private Metadata metadata;
private Metrics metrics;
@@ -133,8 +134,8 @@ public class ConsumerCoordinatorTest {
public void setup() {
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE, true);
- this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
this.client = new MockClient(time, metadata);
+ this.client.updateMetadata(metadataResponse);
this.consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100,
requestTimeoutMs, Integer.MAX_VALUE);
this.metrics = new Metrics(time);
@@ -142,7 +143,6 @@ public class ConsumerCoordinatorTest {
this.mockOffsetCommitCallback = new MockCommitCallback();
this.partitionAssignor.clear();
- client.setNode(node);
this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
}
@@ -366,7 +366,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(metadataResponse);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -384,7 +384,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(metadataResponse);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -423,7 +423,7 @@ public class ConsumerCoordinatorTest {
// partially update the metadata with one topic first,
// let the leader to refresh metadata during assignment
metadata.setTopics(singletonList(topic1));
- metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -443,7 +443,7 @@ public class ConsumerCoordinatorTest {
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+ client.prepareMetadataUpdate(metadataResponse);
coordinator.poll(time.timer(Long.MAX_VALUE));
@@ -463,7 +463,7 @@ public class ConsumerCoordinatorTest {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.needMetadataForAllTopics(true);
- metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
assertEquals(singleton(topic1), subscriptions.subscription());
@@ -484,7 +484,7 @@ public class ConsumerCoordinatorTest {
final Map<String, Integer> updatedPartitions = new HashMap<>();
for (String topic : updatedSubscription)
updatedPartitions.put(topic, 1);
- metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, updatedPartitions));
return true;
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
@@ -526,15 +526,14 @@ public class ConsumerCoordinatorTest {
final String consumerId = "consumer";
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
- metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(),
- time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
assertEquals(singleton(topic1), subscriptions.subscription());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// Instrument the test so that metadata will contain two topics after next refresh.
- client.prepareMetadataUpdate(cluster, Collections.emptySet());
+ client.prepareMetadataUpdate(metadataResponse);
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
client.prepareResponse(new MockClient.RequestMatcher() {
@@ -571,7 +570,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(metadataResponse);
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -671,7 +670,7 @@ public class ConsumerCoordinatorTest {
// partially update the metadata with one topic first,
// let the leader to refresh metadata during assignment
metadata.setTopics(singletonList(topic1));
- metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -688,7 +687,7 @@ public class ConsumerCoordinatorTest {
}
}, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
// expect client to force updating the metadata, if yes gives it both topics
- client.prepareMetadataUpdate(cluster, Collections.emptySet());
+ client.prepareMetadataUpdate(metadataResponse);
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
@@ -847,7 +846,7 @@ public class ConsumerCoordinatorTest {
// ensure metadata is up-to-date for leader
metadata.setTopics(singletonList(topic1));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(metadataResponse);
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -866,7 +865,7 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
// a new partition is added to the topic
- metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds());
+ metadata.update(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 2)), time.milliseconds());
// we should detect the change and ask for reassignment
assertTrue(coordinator.rejoinNeededOrPending());
@@ -886,7 +885,7 @@ public class ConsumerCoordinatorTest {
metadata.setTopics(topics);
// we only have metadata for one topic initially
- metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -907,7 +906,7 @@ public class ConsumerCoordinatorTest {
Map<String, Integer> topicPartitionCounts = new HashMap<>();
topicPartitionCounts.put(topic1, 1);
topicPartitionCounts.put(topic2, 1);
- metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, topicPartitionCounts));
return true;
}
return false;
@@ -948,7 +947,7 @@ public class ConsumerCoordinatorTest {
metadata.setTopics(topics);
// we only have metadata for one topic initially
- metadata.update(TestUtils.singletonCluster(topic, 1), Collections.emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(topic1, 1)));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
@@ -977,33 +976,32 @@ public class ConsumerCoordinatorTest {
@Test
public void testRebalanceAfterTopicUnavailableWithSubscribe() {
- unavailableTopicTest(false, false, Collections.<String>emptySet());
+ unavailableTopicTest(false, Collections.emptySet());
}
@Test
public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
- unavailableTopicTest(true, false, Collections.<String>emptySet());
+ unavailableTopicTest(true, Collections.emptySet());
}
@Test
- public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
- unavailableTopicTest(true, false, Collections.singleton("notmatching"));
+ public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
+ unavailableTopicTest(true, Collections.singleton("notmatching"));
}
@Test
public void testAssignWithTopicUnavailable() {
- unavailableTopicTest(true, false, Collections.<String>emptySet());
+ unavailableTopicTest(true, Collections.emptySet());
}
- private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
+ private void unavailableTopicTest(boolean patternSubscribe, Set<String> unavailableTopicsInLastMetadata) {
final String consumerId = "consumer";
metadata.setTopics(singletonList(topic1));
- client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ Collections.singletonMap(topic1, Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
- if (assign)
- subscriptions.assignFromUser(singleton(t1p));
- else if (patternSubscribe)
+ if (patternSubscribe)
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
else
subscriptions.subscribe(singleton(topic1), rebalanceListener);
@@ -1017,32 +1015,34 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
- if (!assign) {
- assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
- }
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested());
- client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata);
+ Map<String, Errors> topicErrors = new HashMap<>();
+ for (String topic : unavailableTopicsInLastMetadata)
+ topicErrors.put(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION);
+
+ client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ topicErrors, singletonMap(topic1, 1)));
+
client.poll(0, time.milliseconds());
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
- if (!assign) {
- assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(singleton(t1p), rebalanceListener.assigned);
- }
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(singleton(t1p), rebalanceListener.assigned);
}
@Test
public void testExcludeInternalTopicsConfigOption() {
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
- metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 1)));
- assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+ assertFalse(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
}
@Test
@@ -1050,9 +1050,9 @@ public class ConsumerCoordinatorTest {
coordinator = buildCoordinator(new Metrics(), assignors, false, false, true);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
- metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap(Topic.GROUP_METADATA_TOPIC_NAME, 2)));
- assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
+ assertTrue(subscriptions.subscription().contains(Topic.GROUP_METADATA_TOPIC_NAME));
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 8f6328d..45b420e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -50,10 +50,10 @@ public class ConsumerNetworkClientTest {
private String topicName = "test";
private MockTime time = new MockTime(1);
- private MockClient client = new MockClient(time);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+ private MockClient client = new MockClient(time, metadata);
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(),
client, metadata, time, 100, 1000, Integer.MAX_VALUE);
@@ -273,7 +273,7 @@ public class ConsumerNetworkClientTest {
int requestTimeoutMs = 10;
final AtomicBoolean isReady = new AtomicBoolean();
final AtomicBoolean disconnected = new AtomicBoolean();
- client = new MockClient(time) {
+ client = new MockClient(time, metadata) {
@Override
public boolean ready(Node node, long now) {
if (isReady.get())
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 94d8d5b..c7b0b30 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -113,6 +113,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -122,7 +123,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
@SuppressWarnings("deprecation")
public class FetcherTest {
private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
@@ -140,12 +140,12 @@ public class FetcherTest {
private long retryBackoffMs = 100;
private long requestTimeoutMs = 30000;
private MockTime time = new MockTime(1);
+ private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
private MockClient client = new MockClient(time, metadata);
- private Cluster cluster = TestUtils.singletonCluster(topicName, 4);
- private Node node = cluster.nodes().get(0);
+ private Node node;
private Metrics metrics = new Metrics(time);
- FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
+ private FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + groupId);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
@@ -163,10 +163,9 @@ public class FetcherTest {
private ExecutorService executorService;
@Before
- public void setup() throws Exception {
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
- client.setNode(node);
-
+ public void setup() {
+ client.updateMetadata(initialUpdateResponse);
+ node = metadata.fetch().nodes().get(0);
records = buildRecords(1L, 3, 1);
nextRecords = buildRecords(4L, 2, 4);
emptyRecords = buildRecords(0L, 0, 0);
@@ -1204,7 +1203,7 @@ public class FetcherTest {
assertFalse(subscriptions.hasValidPosition(tp0));
// Expect a metadata refresh
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+ client.prepareMetadataUpdate(initialUpdateResponse);
consumerClient.pollNoWakeup();
assertFalse(client.hasPendingMetadataUpdates());
@@ -1233,7 +1232,7 @@ public class FetcherTest {
assertFalse(subscriptions.hasValidPosition(tp0));
// Expect a metadata refresh
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
+ client.prepareMetadataUpdate(initialUpdateResponse);
consumerClient.pollNoWakeup();
assertFalse(client.hasPendingMetadataUpdates());
@@ -1436,7 +1435,7 @@ public class FetcherTest {
Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
- assertEquals(cluster.topics().size(), allTopics.size());
+ assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
}
@Test
@@ -1445,7 +1444,7 @@ public class FetcherTest {
client.prepareResponse(null, true);
client.prepareResponse(newMetadataResponse(topicName, Errors.NONE));
Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(time.timer(5000L));
- assertEquals(cluster.topics().size(), allTopics.size());
+ assertEquals(initialUpdateResponse.topicMetadata().size(), allTopics.size());
}
@Test(expected = TimeoutException.class)
@@ -1535,7 +1534,7 @@ public class FetcherTest {
Assert.assertNotNull(topicMetadata);
Assert.assertNotNull(topicMetadata.get(topicName));
//noinspection ConstantConditions
- Assert.assertEquals((int) cluster.partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
+ Assert.assertEquals((int) metadata.fetch().partitionCountForTopic(topicName), topicMetadata.get(topicName).size());
}
/*
@@ -1717,8 +1716,7 @@ public class FetcherTest {
Map<String, Integer> partitionCounts = new HashMap<>();
partitionCounts.put(topic1, 1);
partitionCounts.put(topic2, 1);
- Cluster cluster = TestUtils.clusterWith(1, partitionCounts);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, partitionCounts));
subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
@@ -1950,21 +1948,24 @@ public class FetcherTest {
client.reset();
// Metadata initially has one topic
- Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(3, singletonMap(topicName, 2));
+ client.updateMetadata(initialMetadata);
// The first metadata refresh should contain one topic
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false);
- client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0));
- client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1));
+ client.prepareMetadataUpdate(initialMetadata, false);
+ client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L),
+ metadata.fetch().leaderFor(tp0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L),
+ metadata.fetch().leaderFor(tp1));
// Second metadata refresh should contain two topics
Map<String, Integer> partitionNumByTopic = new HashMap<>();
partitionNumByTopic.put(topicName, 2);
partitionNumByTopic.put(anotherTopic, 1);
- Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
- client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false);
- client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0));
+ MetadataResponse updatedMetadata = TestUtils.metadataUpdateWith(3, partitionNumByTopic);
+ client.prepareMetadataUpdate(updatedMetadata, false);
+ client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L),
+ metadata.fetch().leaderFor(t2p0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
@@ -2518,9 +2519,11 @@ public class FetcherTest {
Set<TopicPartition> topicPartitions = new HashSet<>();
for (int i = 0; i < numPartitions; i++)
topicPartitions.add(new TopicPartition(topicName, i));
- cluster = TestUtils.singletonCluster(topicName, numPartitions);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
- client.setNode(node);
+
+ MetadataResponse initialMetadataResponse = TestUtils.metadataUpdateWith(1,
+ singletonMap(topicName, numPartitions));
+ client.updateMetadata(initialMetadataResponse);
+ node = metadata.fetch().nodes().get(0);
fetchSize = 10000;
Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(
@@ -2735,23 +2738,28 @@ public class FetcherTest {
String topicName2 = "topic2";
TopicPartition t2p0 = new TopicPartition(topicName2, 0);
// Expect a metadata refresh.
- metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
- Collections.<String>emptySet(),
- time.milliseconds());
+ metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
+ ClientDnsLookup.DEFAULT), time.milliseconds());
Map<String, Integer> partitionNumByTopic = new HashMap<>();
partitionNumByTopic.put(topicName, 2);
partitionNumByTopic.put(topicName2, 1);
- cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+ MetadataResponse updateMetadataResponse = TestUtils.metadataUpdateWith(2, partitionNumByTopic);
+ Cluster updatedCluster = updateMetadataResponse.cluster();
+
// The metadata refresh should contain all the topics.
- client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), true);
+ client.prepareMetadataUpdate(updateMetadataResponse, true);
// First try should fail due to metadata error.
- client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
- client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0),
+ updatedCluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1, offsetForP1, offsetForP1),
+ updatedCluster.leaderFor(tp1));
// Second try should succeed.
- client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
- client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0),
+ updatedCluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForP1, offsetForP1),
+ updatedCluster.leaderFor(tp1));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(t2p0, 0L);
@@ -2777,15 +2785,16 @@ public class FetcherTest {
private void testGetOffsetsForTimesWithUnknownOffset() {
client.reset();
// Ensure metadata has both partition.
- Cluster cluster = TestUtils.clusterWith(1, topicName, 1);
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ MetadataResponse initialMetadataUpdate = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
+ client.updateMetadata(initialMetadataUpdate);
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE,
ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()));
- client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), cluster.leaderFor(tp0));
+ client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+ metadata.fetch().leaderFor(tp0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, 0L);
@@ -2851,20 +2860,20 @@ public class FetcherTest {
private MetadataResponse newMetadataResponse(String topic, Errors error) {
List<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<>();
if (error == Errors.NONE) {
- for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
- partitionsMetadata.add(new MetadataResponse.PartitionMetadata(
- Errors.NONE,
- partitionInfo.partition(),
- partitionInfo.leader(),
- Optional.empty(),
- Arrays.asList(partitionInfo.replicas()),
- Arrays.asList(partitionInfo.inSyncReplicas()),
- Arrays.asList(partitionInfo.offlineReplicas())));
- }
+ Optional<MetadataResponse.TopicMetadata> foundMetadata = initialUpdateResponse.topicMetadata()
+ .stream()
+ .filter(topicMetadata -> topicMetadata.topic().equals(topic))
+ .findFirst();
+ foundMetadata.ifPresent(topicMetadata -> {
+ partitionsMetadata.addAll(topicMetadata.partitionMetadata());
+ });
}
- MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
- return new MetadataResponse(cluster.nodes(), null, MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
+ MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false,
+ partitionsMetadata);
+ List<Node> brokers = new ArrayList<>(initialUpdateResponse.brokers());
+ return new MetadataResponse(brokers, initialUpdateResponse.clusterId(),
+ initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata));
}
private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index b5d7709..b544a65 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.producer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
@@ -37,6 +34,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
@@ -52,10 +51,12 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -66,6 +67,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertArrayEquals;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -209,20 +212,16 @@ public class KafkaProducerTest {
@Test
public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
- Map<String, Object> configs = new HashMap();
+ Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster("topic", 1);
- Node node = cluster.nodes().get(0);
-
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
-
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
+ client.updateMetadata(initialUpdateResponse);
final Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time);
@@ -443,7 +442,9 @@ public class KafkaProducerTest {
for (int i = 0; i < 10; i++) {
while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
Thread.yield();
- metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+ MetadataResponse updateResponse = TestUtils.metadataUpdateWith("kafka-cluster", 1,
+ singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
+ metadata.update(updateResponse, time.milliseconds());
time.sleep(60 * 1000L);
}
});
@@ -479,14 +480,9 @@ public class KafkaProducerTest {
Serializer<String> valueSerializer = mock(serializerClassToMock);
String topic = "topic";
- final Cluster cluster = new Cluster(
- "dummy",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
- Collections.emptySet(),
- Collections.emptySet());
Metadata metadata = new Metadata(0, 90000, true);
- metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+ metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs, keySerializer, valueSerializer, metadata,
null, null, Time.SYSTEM);
@@ -553,13 +549,8 @@ public class KafkaProducerTest {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
Metadata metadata = new Metadata(0, 90000, true);
- final Cluster cluster = new Cluster(
- "dummy",
- Collections.singletonList(new Node(0, "host1", 1000)),
- Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])),
- Collections.emptySet(),
- Collections.emptySet());
- metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
+ metadata.update(initialUpdateResponse, Time.SYSTEM.milliseconds());
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
ProducerInterceptors<String, String> interceptors = mock(ProducerInterceptors.class);
@@ -595,15 +586,12 @@ public class KafkaProducerTest {
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
- Time time = Time.SYSTEM;
- Cluster cluster = TestUtils.singletonCluster("topic", 1);
- Node node = cluster.nodes().get(0);
-
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+ metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
@@ -620,14 +608,11 @@ public class KafkaProducerTest {
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster("topic", 1);
- Node node = cluster.nodes().get(0);
-
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+ metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
@@ -651,14 +636,11 @@ public class KafkaProducerTest {
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "15000");
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster();
- Node node = cluster.nodes().get(0);
-
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+ metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
@@ -666,20 +648,19 @@ public class KafkaProducerTest {
String invalidTopicName = "topic abc"; // Invalid topic name due to space
ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka");
- Set<String> invalidTopic = new HashSet<>();
- invalidTopic.add(invalidTopicName);
- Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(),
- cluster.nodes(),
- new ArrayList<>(0),
- Collections.emptySet(),
- invalidTopic,
- cluster.internalTopics(),
- cluster.controller());
- client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.emptySet());
+ List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+ topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
+ invalidTopicName, false, Collections.emptyList()));
+ MetadataResponse updateResponse = new MetadataResponse(
+ new ArrayList<>(initialUpdateResponse.brokers()),
+ initialUpdateResponse.clusterId(),
+ initialUpdateResponse.controller().id(),
+ topicMetadata);
+ client.prepareMetadataUpdate(updateResponse);
Future<RecordMetadata> future = producer.send(record);
- assertEquals("Cluster has incorrect invalid topic list", metaDataUpdateResponseCluster.invalidTopics(),
+ assertEquals("Cluster has incorrect invalid topic list.", Collections.singleton(invalidTopicName),
metadata.fetch().invalidTopics());
TestUtils.assertFutureError(future, InvalidTopicException.class);
@@ -697,12 +678,10 @@ public class KafkaProducerTest {
// return with a KafkaException.
String topicName = "test";
Time time = new MockTime();
- Cluster cluster = TestUtils.singletonCluster();
- Node node = cluster.nodes().get(0);
- Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
- metadata.update(cluster, Collections.emptySet(), time.milliseconds());
+ MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, emptyMap());
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+ metadata.update(initialUpdateResponse, time.milliseconds());
MockClient client = new MockClient(time, metadata);
- client.setNode(node);
Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(), new StringSerializer(),
metadata, client, null, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 8a8ddd3..606637e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -73,6 +73,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
@@ -115,11 +116,10 @@ public class SenderTest {
private TopicPartition tp0 = new TopicPartition("test", 0);
private TopicPartition tp1 = new TopicPartition("test", 1);
private MockTime time = new MockTime();
- private MockClient client = new MockClient(time);
private int batchSize = 16 * 1024;
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+ private MockClient client = new MockClient(time, metadata);
private ApiVersions apiVersions = new ApiVersions();
- private Cluster cluster = TestUtils.singletonCluster("test", 2);
private Metrics metrics = null;
private RecordAccumulator accumulator = null;
private Sender sender = null;
@@ -128,7 +128,6 @@ public class SenderTest {
@Before
public void setup() {
- client.setNode(cluster.nodes().get(0));
setupWithTransactionState(null);
}
@@ -395,8 +394,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
- Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
- metadata.update(cluster1, Collections.emptySet(), time.milliseconds());
+ MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
+ client.prepareMetadataUpdate(metadataUpdate1);
// Send the first message.
TopicPartition tp2 = new TopicPartition("test", 1);
@@ -416,8 +415,8 @@ public class SenderTest {
accumulator.append(tp2, 0L, "key2".getBytes(), "value2".getBytes(), null, null, MAX_BLOCK_TIMEOUT);
// Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0
- Cluster cluster2 = TestUtils.singletonCluster("test", 2);
- metadata.update(cluster2, Collections.emptySet(), time.milliseconds());
+ MetadataResponse metadataUpdate2 = TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2));
+ client.prepareMetadataUpdate(metadataUpdate2);
// Sender should not send the second message to node 0.
assertEquals(1, sender.inFlightBatches(tp2).size());
sender.run(time.milliseconds()); // receive the response for the previous send, and send the new batch
@@ -458,9 +457,9 @@ public class SenderTest {
// Advance the clock to expire the first batch.
time.sleep(10000);
- Node clusterNode = this.cluster.nodes().get(0);
+ Node clusterNode = metadata.fetch().nodes().get(0);
Map<Integer, List<ProducerBatch>> drainedBatches =
- accumulator.drain(cluster, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
+ accumulator.drain(metadata.fetch(), Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
sender.addToInflightBatches(drainedBatches);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
@@ -484,12 +483,12 @@ public class SenderTest {
@Test
public void testMetadataTopicExpiry() throws Exception {
long offset = 0;
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
sender.run(time.milliseconds()); // send produce request
client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());
@@ -501,12 +500,12 @@ public class SenderTest {
assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp0.topic()));
time.sleep(Metadata.TOPIC_EXPIRY_MS);
- metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.emptyMap()));
assertFalse("Unused topic has not been expired", metadata.containsTopic(tp0.topic()));
future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds());
assertTrue("Topic not added to metadata", metadata.containsTopic(tp0.topic()));
- metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
sender.run(time.milliseconds()); // send produce request
client.respond(produceResponse(tp0, offset++, Errors.NONE, 0));
sender.run(time.milliseconds());
@@ -522,7 +521,6 @@ public class SenderTest {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(producerId, transactionManager.producerIdAndEpoch().producerId);
@@ -534,7 +532,6 @@ public class SenderTest {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFalse(transactionManager.hasProducerId());
assertTrue(transactionManager.hasError());
@@ -988,7 +985,7 @@ public class SenderTest {
// Send first ProduceRequest
Future<RecordMetadata> request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
- Node node = this.cluster.nodes().get(0);
+ Node node = metadata.fetch().nodes().get(0);
time.sleep(10000L);
client.disconnect(node.idString());
client.blackout(node, 10);
@@ -1024,7 +1021,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive first response
assertEquals(1, sender.inFlightBatches(tp0).size());
- Node node = this.cluster.nodes().get(0);
+ Node node = metadata.fetch().nodes().get(0);
// We add 600 millis to expire the first batch but not the second.
// Note deliveryTimeoutMs is 1500.
time.sleep(600L);
@@ -1088,7 +1085,7 @@ public class SenderTest {
sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
sender.run(time.milliseconds()); // receive first response
- Node node = this.cluster.nodes().get(0);
+ Node node = metadata.fetch().nodes().get(0);
time.sleep(1000L);
client.disconnect(node.idString());
client.blackout(node, 10);
@@ -1136,7 +1133,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // receive response
assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
- Node node = this.cluster.nodes().get(0);
+ Node node = metadata.fetch().nodes().get(0);
time.sleep(15000L);
client.disconnect(node.idString());
client.blackout(node, 10);
@@ -1160,7 +1157,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
@@ -1206,7 +1202,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
@@ -1576,7 +1571,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
@@ -1604,7 +1598,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
@@ -1647,7 +1640,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
@@ -1673,7 +1665,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
@@ -1700,7 +1691,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
@@ -1745,7 +1735,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
@@ -1784,7 +1773,6 @@ public class SenderTest {
TransactionManager transactionManager = new TransactionManager();
transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(producerId, (short) 0));
setupWithTransactionState(transactionManager);
- client.setNode(new Node(1, "localhost", 33343));
int maxRetries = 10;
Metrics m = new Metrics();
@@ -1853,8 +1841,8 @@ public class SenderTest {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions());
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
- Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
- metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
+ MetadataResponse metadataUpdate1 = TestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
+ client.prepareMetadataUpdate(metadataUpdate1);
// Send the first message.
Future<RecordMetadata> f1 =
accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future;
@@ -2068,7 +2056,7 @@ public class SenderTest {
@Test
public void testResetNextBatchExpiry() throws Exception {
- client = spy(new MockClient(time));
+ client = spy(new MockClient(time, metadata));
setupWithTransactionState(null);
@@ -2184,25 +2172,20 @@ public class SenderTest {
}
private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) {
+ long deliveryTimeoutMs = 1500L;
long totalSize = 1024 * 1024;
String metricGrpName = "producer-metrics";
- Map<String, String> metricTags = new LinkedHashMap<>();
- metricTags.put("client-id", CLIENT_ID);
- MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+ MetricConfig metricConfig = new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID));
this.metrics = new Metrics(metricConfig, time);
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
- setupWithTransactionState(transactionManager, guaranteeOrder, metricTags, pool);
- }
- private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map<String, String> metricTags, BufferPool pool) {
- long deliveryTimeoutMs = 1500L;
- String metricGrpName = "producer-metrics";
this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L,
- deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
+ deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
- Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
- this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds());
+ Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
+
+ this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
}
private void assertSendFailure(Class<? extends RuntimeException> expectedError) throws Exception {
@@ -2244,7 +2227,7 @@ public class SenderTest {
}
private void prepareFindCoordinatorResponse(Errors error) {
- client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0)));
+ client.prepareResponse(new FindCoordinatorResponse(error, metadata.fetch().nodes().get(0)));
}
private void prepareInitPidResponse(Errors error, long pid, short epoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index cf730b9..72c0a0b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -102,11 +102,10 @@ public class TransactionManagerTest {
private TopicPartition tp0 = new TopicPartition(topic, 0);
private TopicPartition tp1 = new TopicPartition(topic, 1);
private MockTime time = new MockTime();
- private MockClient client = new MockClient(time);
-
private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
+ private MockClient client = new MockClient(time, metadata);
+
private ApiVersions apiVersions = new ApiVersions();
- private Cluster cluster = TestUtils.singletonCluster("test", 2);
private RecordAccumulator accumulator = null;
private Sender sender = null;
private TransactionManager transactionManager = null;
@@ -133,8 +132,7 @@ public class TransactionManagerTest {
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
- this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
- client.setNode(brokerNode);
+ this.client.updateMetadata(TestUtils.metadataUpdateWith(1, singletonMap("test", 2)));
}
@Test
@@ -2042,7 +2040,7 @@ public class TransactionManagerTest {
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
- Node clusterNode = this.cluster.nodes().get(0);
+ Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
@@ -2098,7 +2096,7 @@ public class TransactionManagerTest {
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
- Node clusterNode = this.cluster.nodes().get(0);
+ Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
@@ -2155,7 +2153,7 @@ public class TransactionManagerTest {
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
- Node clusterNode = this.cluster.nodes().get(0);
+ Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
@@ -2226,7 +2224,7 @@ public class TransactionManagerTest {
time.sleep(10000);
// Disconnect the target node for the pending produce request. This will ensure that sender will try to
// expire the batch.
- Node clusterNode = this.cluster.nodes().get(0);
+ Node clusterNode = metadata.fetch().nodes().get(0);
client.disconnect(clusterNode.idString());
client.blackout(clusterNode, 100);
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index afb342b..50d1eeb 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -21,9 +21,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -42,15 +45,16 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;
-import java.util.concurrent.ExecutionException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -70,9 +74,6 @@ public class TestUtils {
public static final String DIGITS = "0123456789";
public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
- public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
- public static final Set<String> INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME);
-
/* A consistent random number generator to make tests repeatable */
public static final Random SEEDED_RANDOM = new Random(192348092834L);
public static final Random RANDOM = new Random();
@@ -105,7 +106,52 @@ public class TestUtils {
for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
}
- return new Cluster("kafka-cluster", asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS);
+ return new Cluster("kafka-cluster", asList(ns), parts, Collections.emptySet(), Topic.INTERNAL_TOPICS);
+ }
+
+ public static MetadataResponse metadataUpdateWith(final int numNodes,
+ final Map<String, Integer> topicPartitionCounts) {
+ return metadataUpdateWith("kafka-cluster", numNodes, topicPartitionCounts);
+ }
+
+ public static MetadataResponse metadataUpdateWith(final String clusterId,
+ final int numNodes,
+ final Map<String, Integer> topicPartitionCounts) {
+ return metadataUpdateWith(clusterId, numNodes, Collections.emptyMap(), topicPartitionCounts);
+ }
+
+ public static MetadataResponse metadataUpdateWith(final String clusterId,
+ final int numNodes,
+ final Map<String, Errors> topicErrors,
+ final Map<String, Integer> topicPartitionCounts) {
+ final List<Node> nodes = new ArrayList<>(numNodes);
+ for (int i = 0; i < numNodes; i++)
+ nodes.add(new Node(i, "localhost", 1969 + i));
+
+ List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
+ for (Map.Entry<String, Integer> topicPartitionCountEntry : topicPartitionCounts.entrySet()) {
+ String topic = topicPartitionCountEntry.getKey();
+ int numPartitions = topicPartitionCountEntry.getValue();
+
+ List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ Node leader = nodes.get(i % nodes.size());
+ List<Node> replicas = Collections.singletonList(leader);
+ partitionMetadata.add(new MetadataResponse.PartitionMetadata(
+ Errors.NONE, i, leader, Optional.empty(), replicas, replicas, replicas));
+ }
+
+ topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, topic,
+ Topic.isInternal(topic), partitionMetadata));
+ }
+
+ for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
+ String topic = topicErrorEntry.getKey();
+ topicMetadata.add(new MetadataResponse.TopicMetadata(topicErrorEntry.getValue(), topic,
+ Topic.isInternal(topic), Collections.emptyList()));
+ }
+
+ return new MetadataResponse(nodes, clusterId, 0, topicMetadata);
}
public static Cluster clusterWith(final int nodes, final String topic, final int partitions) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index cb62fb6..ed48d57 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -99,7 +98,7 @@ public class WorkerGroupMember {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
- this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
+ this.metadata.bootstrap(addresses, time.milliseconds());
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
NetworkClient netClient = new NetworkClient(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index ede6c71..7ccb68c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -74,8 +73,7 @@ public class WorkerCoordinatorTest {
private long retryBackoffMs = 100;
private MockTime time;
private MockClient client;
- private Cluster cluster = TestUtils.singletonCluster("topic", 1);
- private Node node = cluster.nodes().get(0);
+ private Node node;
private Metadata metadata;
private Metrics metrics;
private ConsumerNetworkClient consumerClient;
@@ -92,16 +90,15 @@ public class WorkerCoordinatorTest {
LogContext loggerFactory = new LogContext();
this.time = new MockTime();
- this.client = new MockClient(time);
this.metadata = new Metadata(0, Long.MAX_VALUE, true);
- this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+ this.client = new MockClient(time, metadata);
+ this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+ this.node = metadata.fetch().nodes().get(0);
this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, 100, 1000, heartbeatIntervalMs);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
- client.setNode(node);
-
this.coordinator = new WorkerCoordinator(
loggerFactory,
consumerClient,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 5b1e155..26c8e71 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.utils.MockTime;
import org.junit.Test;
import java.util.Collections;
@@ -49,8 +50,7 @@ public class TopicAdminTest {
public void returnNullWithApiVersionMismatch() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
- try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
- env.kafkaClient().setNode(cluster.controller());
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(createTopicResponseWithUnsupportedVersion(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
@@ -63,8 +63,7 @@ public class TopicAdminTest {
public void returnNullWithClusterAuthorizationFailure() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
- try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
- env.kafkaClient().setNode(cluster.nodes().iterator().next());
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().prepareResponse(createTopicResponseWithClusterAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(null, env.adminClient());
boolean created = admin.createTopic(newTopic);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5e0f7c8..ae2ffb1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -439,8 +439,7 @@ object AdminClient {
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
- val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
- metadata.update(bootstrapCluster, Collections.emptySet(), 0)
+ metadata.bootstrap(brokerAddresses, time.milliseconds())
val clientId = "admin-" + AdminClientIdSequence.getAndIncrement()
@@ -482,6 +481,6 @@ object AdminClient {
requestTimeoutMs,
retryBackoffMs,
highLevelClient,
- bootstrapCluster.nodes.asScala.toList)
+ metadata.fetch.nodes.asScala.toList)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
index b7c037e..8c7e0dc 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
@@ -16,8 +16,12 @@
*/
package kafka.server.epoch.util
+import java.util
+import java.util.Collections
+
import kafka.cluster.BrokerEndPoint
import kafka.server.BlockingSend
+import org.apache.kafka.clients.MockClient.MockMetadataUpdater
import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.Records
@@ -35,7 +39,11 @@ import org.apache.kafka.common.{Node, TopicPartition}
* setOffsetsForNextResponse
*/
class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
- private val client = new MockClient(new SystemTime)
+ private val client = new MockClient(new SystemTime, new MockMetadataUpdater {
+ override def fetchNodes(): util.List[Node] = Collections.emptyList()
+ override def isUpdateNeeded: Boolean = false
+ override def update(time: Time, update: MockClient.MetadataUpdate): Unit = {}
+ })
var fetchCount = 0
var epochFetchCount = 0
var lastUsedOffsetForLeaderEpochVersion = -1
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index bb199b7..cc9a2ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -604,9 +605,11 @@ public abstract class AbstractResetIntegrationTest {
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {
// do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm
if (intermediateUserTopic != null) {
- cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
+ cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+ Topic.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic);
} else {
- cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
+ cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
+ Topic.GROUP_METADATA_TOPIC_NAME);
}
}
}