You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/06/15 15:20:00 UTC
[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang
forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-4879:
-------------------------------
Fix Version/s: 0.11.1.0
> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.10.2.0
> Reporter: Shixiong Zhu
> Fix For: 0.11.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
> public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set<TopicPartition> partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
> // Sleep 30 seconds to give us enough time to delete the topic.
> Thread.sleep(30000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
> System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)