You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jason Gustafson (JIRA)" <ji...@apache.org> on 2017/12/19 23:06:00 UTC

[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

    [ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297562#comment-16297562 ] 

Jason Gustafson commented on KAFKA-4879:
----------------------------------------

[~baluchicken] It looks like progress on this issue has stalled. Do you mind if I pick it up?

I'm not sure we've reached consensus on the solution. The underlying issue is that the consumer blocks to find starting offsets for partitions which are assigned. If we are in {{poll()}}, then we will be stuck there until the partition is re-created. If we are in {{position()}} or one of the relative {{seek()}} methods, we will be similarly stuck. 

1. In {{poll()}}, if we can't find the starting offset for a partition, I think we ought to begin fetching for other partitions and periodically recheck metadata in the background. We can potentially let the leader of the group rebalance if enough time passes and an assigned partition still doesn't exist. One question is whether we can propagate back to the user an UnknownTopicException at any point, but I think we can punt on this problem for now.

2. I think for the other methods, we have a choice between adding overloaded methods that accept a timeout parameter or adding a config like {{max.block.ms}} to match the producer. I'm somewhat partial to the first one since it is more flexible. If we're going to do a KIP for this, we may as well cover {{commitSync()}} and some of the other blocking APIs as well.

Thoughts?

> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
>                 Key: KAFKA-4879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Shixiong Zhu
>            Assignee: Balint Molnar
>             Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is this line https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
>     // Make sure "delete.topic.enable" is set to true.
>     // Please create the topic test with "3" partitions manually.
>     // The issue is gone when there is only one partition.
>     String topic = "test";
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "testgroup");
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     props.put("key.deserializer", StringDeserializer.class.getName());
>     props.put("enable.auto.commit", "false");
>     KafkaConsumer kc = new KafkaConsumer(props);
>     kc.subscribe(Collections.singletonList(topic));
>     kc.poll(0);
>     Set<TopicPartition> partitions = kc.assignment();
>     System.out.println("partitions: " + partitions);
>     kc.pause(partitions);
>     kc.seekToEnd(partitions);
>     System.out.println("please delete the topic in 30 seconds");
>     try {
>       // Sleep 30 seconds to give us enough time to delete the topic.
>       Thread.sleep(30000);
>     } catch (InterruptedException e) {
>       e.printStackTrace();
>     }
>     System.out.println("sleep end");
>     for (TopicPartition p : partitions) {
>       System.out.println(p + " offset: " + kc.position(p));
>     }
>     System.out.println("cannot reach here");
>     kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)