You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/06 18:56:09 UTC

kafka git commit: KAFKA-2948; Remove unused topics from producer metadata set

Repository: kafka
Updated Branches:
  refs/heads/trunk 79aaf19f2 -> 0cee0c321


KAFKA-2948; Remove unused topics from producer metadata set

If no messages are sent to a topic during the last refresh interval or if UNKNOWN_TOPIC_OR_PARTITION error is received, remove the topic from the metadata list. Topics are added to the list on the next attempt to send a message to the topic.

Author: Rajini Sivaram <ra...@googlemail.com>
Author: rsivaram <rs...@uk.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #645 from rajinisivaram/KAFKA-2948


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cee0c32
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cee0c32
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cee0c32

Branch: refs/heads/trunk
Commit: 0cee0c321897b4fca4409651fdf28188870cb2f0
Parents: 79aaf19
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Jun 6 19:53:53 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 6 19:55:50 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/Metadata.java | 64 ++++++++++++++++----
 .../kafka/clients/producer/KafkaProducer.java   |  8 +--
 .../producer/internals/RecordAccumulator.java   | 20 +++---
 .../clients/producer/internals/Sender.java      |  8 ++-
 .../kafka/common/requests/MetadataResponse.java | 17 ++++--
 .../org/apache/kafka/clients/MetadataTest.java  | 63 +++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java      | 31 ++++++++++
 .../clients/producer/internals/SenderTest.java  | 39 +++++++++++-
 8 files changed, 216 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 322ae0f..54b19a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -15,9 +15,13 @@ package org.apache.kafka.clients;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -32,11 +36,18 @@ import org.slf4j.LoggerFactory;
  * 
  * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
  * topic we don't have any metadata for it will trigger a metadata update.
+ * <p>
+ * If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval
+ * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly
+ * manage topics while producers rely on topic expiry to limit the refresh set.
  */
 public final class Metadata {
 
     private static final Logger log = LoggerFactory.getLogger(Metadata.class);
 
+    public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
+    private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
+
     private final long refreshBackoffMs;
     private final long metadataExpireMs;
     private int version;
@@ -44,9 +55,11 @@ public final class Metadata {
     private long lastSuccessfulRefreshMs;
     private Cluster cluster;
     private boolean needUpdate;
-    private final Set<String> topics;
+    /* Topics with expiry time */
+    private final Map<String, Long> topics;
     private final List<Listener> listeners;
     private boolean needMetadataForAllTopics;
+    private final boolean topicExpiryEnabled;
 
     /**
      * Create a metadata instance with reasonable defaults
@@ -55,21 +68,27 @@ public final class Metadata {
         this(100L, 60 * 60 * 1000L);
     }
 
+    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+        this(refreshBackoffMs, metadataExpireMs, false);
+    }
+
     /**
      * Create a new Metadata instance
      * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
      *        polling
      * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+     * @param topicExpiryEnabled If true, enable expiry of unused topics
      */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
+        this.topicExpiryEnabled = topicExpiryEnabled;
         this.lastRefreshMs = 0L;
         this.lastSuccessfulRefreshMs = 0L;
         this.version = 0;
         this.cluster = Cluster.empty();
         this.needUpdate = false;
-        this.topics = new HashSet<String>();
+        this.topics = new HashMap<>();
         this.listeners = new ArrayList<>();
         this.needMetadataForAllTopics = false;
     }
@@ -82,10 +101,11 @@ public final class Metadata {
     }
 
     /**
-     * Add the topic to maintain in the metadata
+     * Add the topic to maintain in the metadata. If topic expiry is enabled, expiry will
+     * time be reset on the next update.
      */
     public synchronized void add(String topic) {
-        topics.add(topic);
+        topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
     }
 
     /**
@@ -135,21 +155,24 @@ public final class Metadata {
     }
 
     /**
-     * Replace the current set of topics maintained to the one provided
+     * Replace the current set of topics maintained to the one provided.
+     * If topic expiry is enabled, expiry time of the topics will be
+     * reset on the next update.
      * @param topics
      */
     public synchronized void setTopics(Collection<String> topics) {
-        if (!this.topics.containsAll(topics))
+        if (!this.topics.keySet().containsAll(topics))
             requestUpdate();
         this.topics.clear();
-        this.topics.addAll(topics);
+        for (String topic : topics)
+            this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE);
     }
 
     /**
      * Get the list of topics we are currently maintaining metadata for
      */
     public synchronized Set<String> topics() {
-        return new HashSet<String>(this.topics);
+        return new HashSet<>(this.topics.keySet());
     }
 
     /**
@@ -158,11 +181,12 @@ public final class Metadata {
      * @return true if the topic exists, false otherwise
      */
     public synchronized boolean containsTopic(String topic) {
-        return this.topics.contains(topic);
+        return this.topics.containsKey(topic);
     }
 
     /**
-     * Update the cluster metadata
+     * Updates the cluster metadata. If topic expiry is enabled, expiry time
+     * is set for topics if required and expired topics are removed from the metadata.
      */
     public synchronized void update(Cluster cluster, long now) {
         this.needUpdate = false;
@@ -170,6 +194,20 @@ public final class Metadata {
         this.lastSuccessfulRefreshMs = now;
         this.version += 1;
 
+        if (topicExpiryEnabled) {
+            // Handle expiry of topics from the metadata refresh set.
+            for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
+                Map.Entry<String, Long> entry = it.next();
+                long expireMs = entry.getValue();
+                if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
+                    entry.setValue(now + TOPIC_EXPIRY_MS);
+                else if (expireMs <= now) {
+                    it.remove();
+                    log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
+                }
+            }
+        }
+
         for (Listener listener: listeners)
             listener.onMetadataUpdate(cluster);
 
@@ -251,9 +289,9 @@ public final class Metadata {
         List<Node> nodes = Collections.emptyList();
         if (cluster != null) {
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
-            unauthorizedTopics.retainAll(this.topics);
+            unauthorizedTopics.retainAll(this.topics.keySet());
 
-            for (String topic : this.topics) {
+            for (String topic : this.topics.keySet()) {
                 partitionInfos.addAll(cluster.partitionsForTopic(topic));
             }
             nodes = cluster.nodes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 91697c1..a1bdb42 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -224,7 +224,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
@@ -511,10 +511,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @return The amount of time we waited in ms
      */
     private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
-        // add topic to metadata topic list if it is not there already.
-        if (!this.metadata.containsTopic(topic))
-            this.metadata.add(topic);
-
+        // add topic to metadata topic list if it is not there already and reset expiry
+        this.metadata.add(topic);
         if (metadata.fetch().partitionsForTopic(topic) != null)
             return 0;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index a73d882..fa1e513 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -299,7 +299,7 @@ public final class RecordAccumulator {
     public ReadyCheckResult ready(Cluster cluster, long nowMs) {
         Set<Node> readyNodes = new HashSet<>();
         long nextReadyCheckDelayMs = Long.MAX_VALUE;
-        boolean unknownLeadersExist = false;
+        Set<String> unknownLeaderTopics = new HashSet<>();
 
         boolean exhausted = this.free.queued() > 0;
         for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
@@ -307,10 +307,12 @@ public final class RecordAccumulator {
             Deque<RecordBatch> deque = entry.getValue();
 
             Node leader = cluster.leaderFor(part);
-            if (leader == null) {
-                unknownLeadersExist = true;
-            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
-                synchronized (deque) {
+            synchronized (deque) {
+                if (leader == null && !deque.isEmpty()) {
+                    // This is a partition for which leader is not known, but messages are available to send.
+                    // Note that entries are currently not removed from batches when deque is empty.
+                    unknownLeaderTopics.add(part.topic());
+                } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
                         boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
@@ -333,7 +335,7 @@ public final class RecordAccumulator {
             }
         }
 
-        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
+        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
     }
 
     /**
@@ -549,12 +551,12 @@ public final class RecordAccumulator {
     public final static class ReadyCheckResult {
         public final Set<Node> readyNodes;
         public final long nextReadyCheckDelayMs;
-        public final boolean unknownLeadersExist;
+        public final Set<String> unknownLeaderTopics;
 
-        public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) {
+        public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, Set<String> unknownLeaderTopics) {
             this.readyNodes = readyNodes;
             this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
-            this.unknownLeadersExist = unknownLeadersExist;
+            this.unknownLeaderTopics = unknownLeaderTopics;
         }
     }
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 29077b6..f1852b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -175,8 +175,14 @@ public class Sender implements Runnable {
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
         // if there are any partitions whose leaders are not known yet, force metadata update
-        if (result.unknownLeadersExist)
+        if (!result.unknownLeaderTopics.isEmpty()) {
+            // The set of topics with unknown leader contains topics with leader election pending as well as
+            // topics which may have expired. Add the topic again to metadata to ensure it is included
+            // and request metadata update, since there are messages to send to the topic.
+            for (String topic : result.unknownLeaderTopics)
+                this.metadata.add(topic);
             this.metadata.requestUpdate();
+        }
 
         // remove any nodes we aren't ready to send to
         Iterator<Node> iter = result.readyNodes.iterator();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 09a5bee..78b35f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -236,11 +236,22 @@ public class MetadataResponse extends AbstractRequestResponse {
     }
 
     /**
+     * Returns the set of topics with the specified error
+     */
+    public Set<String> topicsByError(Errors error) {
+        Set<String> errorTopics = new HashSet<>();
+        for (TopicMetadata metadata : topicMetadata) {
+            if (metadata.error == error)
+                errorTopics.add(metadata.topic());
+        }
+        return errorTopics;
+    }
+
+    /**
      * Get a snapshot of the cluster metadata from this response
      * @return the cluster snapshot
      */
     public Cluster cluster() {
-        Set<String> unauthorizedTopics = new HashSet<>();
         List<PartitionInfo> partitions = new ArrayList<>();
         for (TopicMetadata metadata : topicMetadata) {
             if (metadata.error == Errors.NONE) {
@@ -251,12 +262,10 @@ public class MetadataResponse extends AbstractRequestResponse {
                             partitionMetadata.leader,
                             partitionMetadata.replicas.toArray(new Node[0]),
                             partitionMetadata.isr.toArray(new Node[0])));
-            } else if (metadata.error == Errors.TOPIC_AUTHORIZATION_FAILED) {
-                unauthorizedTopics.add(metadata.topic);
             }
         }
 
-        return new Cluster(this.brokers, partitions, unauthorizedTopics);
+        return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 0493eb2..5defb13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -204,6 +204,69 @@ public class MetadataTest {
             new HashSet<>(Arrays.asList("topic", "topic1")), topics);
     }
 
+    @Test
+    public void testTopicExpiry() throws Exception {
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
+
+        // Test that topic is expired if not used within the expiry interval
+        long time = 0;
+        metadata.add("topic1");
+        metadata.update(Cluster.empty(), time);
+        time += Metadata.TOPIC_EXPIRY_MS;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("Unused topic not expired", metadata.containsTopic("topic1"));
+
+        // Test that topic is not expired if used within the expiry interval
+        metadata.add("topic2");
+        metadata.update(Cluster.empty(), time);
+        for (int i = 0; i < 3; i++) {
+            time += Metadata.TOPIC_EXPIRY_MS / 2;
+            metadata.update(Cluster.empty(), time);
+            assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
+            metadata.add("topic2");
+        }
+
+        // Test that topics added using setTopics expire
+        HashSet<String> topics = new HashSet<>();
+        topics.add("topic4");
+        metadata.setTopics(topics);
+        metadata.update(Cluster.empty(), time);
+        time += Metadata.TOPIC_EXPIRY_MS;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("Unused topic not expired", metadata.containsTopic("topic4"));
+    }
+
+    @Test
+    public void testNonExpiringMetadata() throws Exception {
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false);
+
+        // Test that topic is not expired if not used within the expiry interval
+        long time = 0;
+        metadata.add("topic1");
+        metadata.update(Cluster.empty(), time);
+        time += Metadata.TOPIC_EXPIRY_MS;
+        metadata.update(Cluster.empty(), time);
+        assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1"));
+
+        // Test that topic is not expired if used within the expiry interval
+        metadata.add("topic2");
+        metadata.update(Cluster.empty(), time);
+        for (int i = 0; i < 3; i++) {
+            time += Metadata.TOPIC_EXPIRY_MS / 2;
+            metadata.update(Cluster.empty(), time);
+            assertTrue("Topic expired even though in use", metadata.containsTopic("topic2"));
+            metadata.add("topic2");
+        }
+
+        // Test that topics added using setTopics don't expire
+        HashSet<String> topics = new HashSet<>();
+        topics.add("topic4");
+        metadata.setTopics(topics);
+        time += metadataExpireMs * 2;
+        metadata.update(Cluster.empty(), time);
+        assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
+    }
+
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index fc5c929..040824f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1113,6 +1113,37 @@ public class ConsumerCoordinatorTest {
         }
     }
 
+    @Test
+    public void testMetadataTopicsExpiryDisabled() {
+        final String consumerId = "consumer";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        HashSet<String> topics = new HashSet<>();
+        topics.add(topicName);
+        metadata.setTopics(topics);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+        assertTrue("Topic not found in metadata", metadata.containsTopic(topicName));
+        time.sleep(Metadata.TOPIC_EXPIRY_MS * 2);
+        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+        assertTrue("Topic expired", metadata.containsTopic(topicName));
+        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+        metadata.update(Cluster.empty(), time.milliseconds());
+        assertTrue("Topic expired", metadata.containsTopic(topicName));
+
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+        metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+    }
+
     private ConsumerCoordinator buildCoordinator(Metrics metrics,
                                                  List<PartitionAssignor> assignors,
                                                  boolean excludeInternalTopics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index fb67747..b8a086b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -13,8 +13,8 @@
 package org.apache.kafka.clients.producer.internals;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Collections;
@@ -60,7 +60,7 @@ public class SenderTest {
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
@@ -226,7 +226,42 @@ public class SenderTest {
         } finally {
             m.close();
         }
+    }
+
+    /**
+     * Tests that topics are added to the metadata list when messages are available to send
+     * and expired if not used during a metadata refresh interval.
+     */
+    @Test
+    public void testMetadataTopicExpiry() throws Exception {
+        long offset = 0;
+        metadata.update(Cluster.empty(), time.milliseconds());
+
+        Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+        metadata.update(cluster, time.milliseconds());
+        sender.run(time.milliseconds());  // send produce request
+        client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+        sender.run(time.milliseconds());
+        assertEquals("Request completed.", 0, client.inFlightRequestCount());
+        sender.run(time.milliseconds());
+        assertTrue("Request should be completed", future.isDone());
 
+        assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic()));
+        time.sleep(Metadata.TOPIC_EXPIRY_MS);
+        metadata.update(Cluster.empty(), time.milliseconds());
+        assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic()));
+        future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic()));
+        metadata.update(cluster, time.milliseconds());
+        sender.run(time.milliseconds());  // send produce request
+        client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0));
+        sender.run(time.milliseconds());
+        assertEquals("Request completed.", 0, client.inFlightRequestCount());
+        sender.run(time.milliseconds());
+        assertTrue("Request should be completed", future.isDone());
     }
 
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {