You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/04/27 22:01:39 UTC

kafka git commit: KAFKA-5086: Update topic expiry time in Metadata every time the topic metadata is requested

Repository: kafka
Updated Branches:
  refs/heads/trunk a82f194b2 -> 5b5efd4b5


KAFKA-5086: Update topic expiry time in Metadata every time the topic metadata is requested

Update topic expiry time after every metadata update to handle max.block.ms greater than 5 minutes

Author: Dong Lin <li...@gmail.com>

Reviewers: Rajini Sivaram <ra...@googlemail.com>, Manikumar Reddy <ma...@gmail.com>, Jiangjie Qin <be...@gmail.com>

Closes #2869 from lindong28/KAFKA-5086


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b5efd4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b5efd4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b5efd4b

Branch: refs/heads/trunk
Commit: 5b5efd4b5cb092813d878b080e0657f40fc1f72c
Parents: a82f194
Author: Dong Lin <li...@gmail.com>
Authored: Thu Apr 27 23:01:01 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Apr 27 23:01:01 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../clients/producer/KafkaProducerTest.java     | 40 ++++++++++++++++++++
 2 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5efd4b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3745aba..e0d9938 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -761,6 +761,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         // is stale and the number of partitions for this topic has increased in the meantime.
         do {
             log.trace("Requesting metadata update for topic {}.", topic);
+            metadata.add(topic);
             int version = metadata.requestUpdate();
             sender.wakeup();
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5efd4b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 53e0d08..819f15e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -23,9 +23,13 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
@@ -308,4 +312,40 @@ public class KafkaProducerTest {
         PowerMock.verify(metadata);
     }
 
+    @Test
+    public void testTopicRefreshInMetadata() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
+        KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+        long refreshBackoffMs = 500L;
+        long metadataExpireMs = 60000L;
+        final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
+        final Time time = new MockTime();
+        MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+        MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
+        final String topic = "topic";
+
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                long startTimeMs = System.currentTimeMillis();
+                for (int i = 0; i < 10; i++) {
+                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
+                        yield();
+                    metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+                    time.sleep(60 * 1000L);
+                }
+            }
+        };
+        t.start();
+        try {
+            producer.partitionsFor(topic);
+            fail("Expect TimeoutException");
+        } catch (TimeoutException e) {
+            // skip
+        }
+        Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
+    }
+
 }