You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/04/27 22:01:39 UTC
kafka git commit: KAFKA-5086: Update topic expiry time in Metadata
every time the topic metadata is requested
Repository: kafka
Updated Branches:
refs/heads/trunk a82f194b2 -> 5b5efd4b5
KAFKA-5086: Update topic expiry time in Metadata every time the topic metadata is requested
Update topic expiry time after every metadata update to handle max.block.ms greater than 5 minutes
Author: Dong Lin <li...@gmail.com>
Reviewers: Rajini Sivaram <ra...@googlemail.com>, Manikumar Reddy <ma...@gmail.com>, Jiangjie Qin <be...@gmail.com>
Closes #2869 from lindong28/KAFKA-5086
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b5efd4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b5efd4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b5efd4b
Branch: refs/heads/trunk
Commit: 5b5efd4b5cb092813d878b080e0657f40fc1f72c
Parents: a82f194
Author: Dong Lin <li...@gmail.com>
Authored: Thu Apr 27 23:01:01 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu Apr 27 23:01:01 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 1 +
.../clients/producer/KafkaProducerTest.java | 40 ++++++++++++++++++++
2 files changed, 41 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5efd4b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3745aba..e0d9938 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -761,6 +761,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
+ metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b5efd4b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 53e0d08..819f15e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -23,9 +23,13 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
@@ -308,4 +312,40 @@ public class KafkaProducerTest {
PowerMock.verify(metadata);
}
+ @Test
+ public void testTopicRefreshInMetadata() throws Exception {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
+ KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
+ long refreshBackoffMs = 500L;
+ long metadataExpireMs = 60000L;
+ final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
+ final Time time = new MockTime();
+ MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);
+ MemberModifier.field(KafkaProducer.class, "time").set(producer, time);
+ final String topic = "topic";
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ long startTimeMs = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++) {
+ while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000)
+ yield();
+ metadata.update(Cluster.empty(), Collections.singleton(topic), time.milliseconds());
+ time.sleep(60 * 1000L);
+ }
+ }
+ };
+ t.start();
+ try {
+ producer.partitionsFor(topic);
+ fail("Expect TimeoutException");
+ } catch (TimeoutException e) {
+ // skip
+ }
+ Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic));
+ }
+
}