You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/06 18:56:09 UTC
kafka git commit: KAFKA-2948;
Remove unused topics from producer metadata set
Repository: kafka
Updated Branches:
refs/heads/trunk 79aaf19f2 -> 0cee0c321
KAFKA-2948; Remove unused topics from producer metadata set
If no messages are sent to a topic during the last refresh interval or if UNKNOWN_TOPIC_OR_PARTITION error is received, remove the topic from the metadata list. Topics are added to the list on the next attempt to send a message to the topic.
Author: Rajini Sivaram <ra...@googlemail.com>
Author: rsivaram <rs...@uk.ibm.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #645 from rajinisivaram/KAFKA-2948
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cee0c32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cee0c32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cee0c32
Branch: refs/heads/trunk
Commit: 0cee0c321897b4fca4409651fdf28188870cb2f0
Parents: 79aaf19
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Jun 6 19:53:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 6 19:55:50 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/Metadata.java | 64 ++++++++++++++++----
.../kafka/clients/producer/KafkaProducer.java | 8 +--
.../producer/internals/RecordAccumulator.java | 20 +++---
.../clients/producer/internals/Sender.java | 8 ++-
.../kafka/common/requests/MetadataResponse.java | 17 ++++--
.../org/apache/kafka/clients/MetadataTest.java | 63 +++++++++++++++++++
.../internals/ConsumerCoordinatorTest.java | 31 ++++++++++
.../clients/producer/internals/SenderTest.java | 39 +++++++++++-
8 files changed, 216 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 322ae0f..54b19a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -15,9 +15,13 @@ package org.apache.kafka.clients;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -32,11 +36,18 @@ import org.slf4j.LoggerFactory;
*
* Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
* topic we don't have any metadata for it will trigger a metadata update.
+ * <p>
+ * If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
+ * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
+ * manage topics while producers rely on topic expiry to limit the refresh set.
*/
public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+ public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
+ private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
+
private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
@@ -44,9 +55,11 @@ public final class Metadata {
private long lastSuccessfulRefreshMs;
private Cluster cluster;
private boolean needUpdate;
- private final Set<String> topics;
+ /* Topics with expiry time */
+ private final Map<String, Long> topics;
private final List<Listener> listeners;
private boolean needMetadataForAllTopics;
+ private final boolean topicExpiryEnabled;
/**
* Create a metadata instance with reasonable defaults
@@ -55,21 +68,27 @@ public final class Metadata {
this(100L, 60 * 60 * 1000L);
}
+ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ this(refreshBackoffMs, metadataExpireMs, false);
+ }
+
/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ * @param topicExpiryEnabled If true, enable expiry of unused topics
*/
- public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
+ this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.needUpdate = false;
- this.topics = new HashSet<String>();
+ this.topics = new HashMap<>();
this.listeners = new ArrayList<>();
this.needMetadataForAllTopics = false;
}
@@ -82,10 +101,11 @@ public final class Metadata {
}
/**
- * Add the topic to maintain in the metadata
+ * Add the topic to maintain in the metadata. If topic expiry is enabled, expiry will
+ * time be reset on the next update.
*/
public synchronized void add(String topic) {
- topics.add(topic);
+ topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
}
/**
@@ -135,21 +155,24 @@ public final class Metadata {
}
/**
- * Replace the current set of topics maintained to the one provided
+ * Replace the current set of topics maintained to the one provided.
+ * If topic expiry is enabled, expiry time of the topics will be
+ * reset on the next update.
* @param topics
*/
public synchronized void setTopics(Collection<String> topics) {
- if (!this.topics.containsAll(topics))
+ if (!this.topics.keySet().containsAll(topics))
requestUpdate();
this.topics.clear();
- this.topics.addAll(topics);
+ for (String topic : topics)
+ this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
}
/**
* Get the list of topics we are currently maintaining metadata for
*/
public synchronized Set<String> topics() {
- return new HashSet<String>(this.topics);
+ return new HashSet<>(this.topics.keySet());
}
/**
@@ -158,11 +181,12 @@ public final class Metadata {
* @return true if the topic exists, false otherwise
*/
public synchronized boolean containsTopic(String topic) {
- return this.topics.contains(topic);
+ return this.topics.containsKey(topic);
}
/**
- * Update the cluster metadata
+ * Updates the cluster metadata. If topic expiry is enabled, expiry time
+ * is set for topics if required and expired topics are removed from the metadata.
*/
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
@@ -170,6 +194,20 @@ public final class Metadata {
this.lastSuccessfulRefreshMs = now;
this.version += 1;
+ if (topicExpiryEnabled) {
+ // Handle expiry of topics from the metadata refresh set.
+ for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> entry = it.next();
+ long expireMs = entry.getValue();
+ if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
+ entry.setValue(now + TOPIC_EXPIRY_MS);
+ else if (expireMs <= now) {
+ it.remove();
+ log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
+ }
+ }
+ }
+
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
@@ -251,9 +289,9 @@ public final class Metadata {
List<Node> nodes = Collections.emptyList();
if (cluster != null) {
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
- unauthorizedTopics.retainAll(this.topics);
+ unauthorizedTopics.retainAll(this.topics.keySet());
- for (String topic : this.topics) {
+ for (String topic : this.topics.keySet()) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
nodes = cluster.nodes();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 91697c1..a1bdb42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -224,7 +224,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.metrics = new Metrics(metricConfig, reporters, time);
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
- this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
+ this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
@@ -511,10 +511,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @return The amount of time we waited in ms
*/
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
- // add topic to metadata topic list if it is not there already.
- if (!this.metadata.containsTopic(topic))
- this.metadata.add(topic);
-
+ // add topic to metadata topic list if it is not there already and reset expiry
+ this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null)
return 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a73d882..fa1e513 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -299,7 +299,7 @@ public final class RecordAccumulator {
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
- boolean unknownLeadersExist = false;
+ Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
@@ -307,10 +307,12 @@ public final class RecordAccumulator {
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
- if (leader == null) {
- unknownLeadersExist = true;
- } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
- synchronized (deque) {
+ synchronized (deque) {
+ if (leader == null && !deque.isEmpty()) {
+ // This is a partition for which leader is not known, but messages are available to send.
+ // Note that entries are currently not removed from batches when deque is empty.
+ unknownLeaderTopics.add(part.topic());
+ } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
@@ -333,7 +335,7 @@ public final class RecordAccumulator {
}
}
- return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
+ return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
/**
@@ -549,12 +551,12 @@ public final class RecordAccumulator {
public final static class ReadyCheckResult {
public final Set<Node> readyNodes;
public final long nextReadyCheckDelayMs;
- public final boolean unknownLeadersExist;
+ public final Set<String> unknownLeaderTopics;
- public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
+ public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, Set<String> unknownLeaderTopics) {
this.readyNodes = readyNodes;
this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
- this.unknownLeadersExist = unknownLeadersExist;
+ this.unknownLeaderTopics = unknownLeaderTopics;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 29077b6..f1852b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -175,8 +175,14 @@ public class Sender implements Runnable {
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
- if (result.unknownLeadersExist)
+ if (!result.unknownLeaderTopics.isEmpty()) {
+ // The set of topics with unknown leader contains topics with leader election pending as well as
+ // topics which may have expired. Add the topic again to metadata to ensure it is included
+ // and request metadata update, since there are messages to send to the topic.
+ for (String topic : result.unknownLeaderTopics)
+ this.metadata.add(topic);
this.metadata.requestUpdate();
+ }
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 09a5bee..78b35f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -236,11 +236,22 @@ public class MetadataResponse extends AbstractRequestResponse {
}
/**
+ * Returns the set of topics with the specified error
+ */
+ public Set<String> topicsByError(Errors error) {
+ Set<String> errorTopics = new HashSet<>();
+ for (TopicMetadata metadata : topicMetadata) {
+ if (metadata.error == error)
+ errorTopics.add(metadata.topic());
+ }
+ return errorTopics;
+ }
+
+ /**
* Get a snapshot of the cluster metadata from this response
* @return the cluster snapshot
*/
public Cluster cluster() {
- Set<String> unauthorizedTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata) {
if (metadata.error == Errors.NONE) {
@@ -251,12 +262,10 @@ public class MetadataResponse extends AbstractRequestResponse {
partitionMetadata.leader,
partitionMetadata.replicas.toArray(new Node[0]),
partitionMetadata.isr.toArray(new Node[0])));
- } else if (metadata.error == Errors.TOPIC_AUTHORIZATION_FAILED) {
- unauthorizedTopics.add(metadata.topic);
}
}
- return new Cluster(this.brokers, partitions, unauthorizedTopics);
+ return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED));
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 0493eb2..5defb13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -204,6 +204,69 @@ public class MetadataTest {
new HashSet<>(Arrays.asList("topic", "topic1")), topics);
}
+ @Test
+ public void testTopicExpiry() throws Exception {
+ metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
+
+ // Test that topic is expired if not used within the expiry interval
+ long time = 0;
+ metadata.add("topic1");
+ metadata.update(Cluster.empty(), time);
+ time += Metadata.TOPIC_EXPIRY_MS;
+ metadata.update(Cluster.empty(), time);
+ assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
+
+ // Test that topic is not expired if used within the expiry interval
+ metadata.add("topic2");
+ metadata.update(Cluster.empty(), time);
+ for (int i = 0; i < 3; i++) {
+ time += Metadata.TOPIC_EXPIRY_MS / 2;
+ metadata.update(Cluster.empty(), time);
+ assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
+ metadata.add("topic2");
+ }
+
+ // Test that topics added using setTopics expire
+ HashSet<String> topics = new HashSet<>();
+ topics.add("topic4");
+ metadata.setTopics(topics);
+ metadata.update(Cluster.empty(), time);
+ time += Metadata.TOPIC_EXPIRY_MS;
+ metadata.update(Cluster.empty(), time);
+ assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
+ }
+
+ @Test
+ public void testNonExpiringMetadata() throws Exception {
+ metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false);
+
+ // Test that topic is not expired if not used within the expiry interval
+ long time = 0;
+ metadata.add("topic1");
+ metadata.update(Cluster.empty(), time);
+ time += Metadata.TOPIC_EXPIRY_MS;
+ metadata.update(Cluster.empty(), time);
+ assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
+
+ // Test that topic is not expired if used within the expiry interval
+ metadata.add("topic2");
+ metadata.update(Cluster.empty(), time);
+ for (int i = 0; i < 3; i++) {
+ time += Metadata.TOPIC_EXPIRY_MS / 2;
+ metadata.update(Cluster.empty(), time);
+ assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
+ metadata.add("topic2");
+ }
+
+ // Test that topics added using setTopics don't expire
+ HashSet<String> topics = new HashSet<>();
+ topics.add("topic4");
+ metadata.setTopics(topics);
+ time += metadataExpireMs * 2;
+ metadata.update(Cluster.empty(), time);
+ assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
+ }
+
private Thread asyncFetch(final String topic) {
Thread thread = new Thread() {
public void run() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fc5c929..040824f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1113,6 +1113,37 @@ public class ConsumerCoordinatorTest {
}
}
+ @Test
+ public void testMetadataTopicsExpiryDisabled() {
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ HashSet<String> topics = new HashSet<>();
+ topics.add(topicName);
+ metadata.setTopics(topics);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorReady();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+ assertTrue("Topic not found in metadata", metadata.containsTopic(topicName));
+ time.sleep(Metadata.TOPIC_EXPIRY_MS * 2);
+ metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+ assertTrue("Topic expired", metadata.containsTopic(topicName));
+ metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+ metadata.update(Cluster.empty(), time.milliseconds());
+ assertTrue("Topic expired", metadata.containsTopic(topicName));
+
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+ metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+ }
+
private ConsumerCoordinator buildCoordinator(Metrics metrics,
List<PartitionAssignor> assignors,
boolean excludeInternalTopics,
http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index fb67747..b8a086b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -13,8 +13,8 @@
package org.apache.kafka.clients.producer.internals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Collections;
@@ -60,7 +60,7 @@ public class SenderTest {
private MockTime time = new MockTime();
private MockClient client = new MockClient(time);
private int batchSize = 16 * 1024;
- private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = null;
private RecordAccumulator accumulator = null;
@@ -226,7 +226,42 @@ public class SenderTest {
} finally {
m.close();
}
+ }
+
+ /**
+ * Tests that topics are added to the metadata list when messages are available to send
+ * and expired if not used during a metadata refresh interval.
+ */
+ @Test
+ public void testMetadataTopicExpiry() throws Exception {
+ long offset = 0;
+ metadata.update(Cluster.empty(), time.milliseconds());
+
+ Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds());
+ assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+ metadata.update(cluster, time.milliseconds());
+ sender.run(time.milliseconds()); // send produce request
+ client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+ sender.run(time.milliseconds());
+ assertEquals("Request completed.", 0, client.inFlightRequestCount());
+ sender.run(time.milliseconds());
+ assertTrue("Request should be completed", future.isDone());
+ assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
+ time.sleep(Metadata.TOPIC_EXPIRY_MS);
+ metadata.update(Cluster.empty(), time.milliseconds());
+ assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
+ future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds());
+ assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+ metadata.update(cluster, time.milliseconds());
+ sender.run(time.milliseconds()); // send produce request
+ client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+ sender.run(time.milliseconds());
+ assertEquals("Request completed.", 0, client.inFlightRequestCount());
+ sender.run(time.milliseconds());
+ assertTrue("Request should be completed", future.isDone());
}
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {