You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "John Roesler (JIRA)" <ji...@apache.org> on 2018/06/05 23:15:00 UTC

[jira] [Created] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata

John Roesler created KAFKA-7000:
-----------------------------------

             Summary: KafkaConsumer.position should wait for assignment metadata
                 Key: KAFKA-7000
                 URL: https://issues.apache.org/jira/browse/KAFKA-7000
             Project: Kafka
          Issue Type: Improvement
            Reporter: John Roesler
            Assignee: John Roesler


While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception:
{code:java}
consumer.subscribe(topics);
// consumer.poll(0); <- I've removed this line, which shouldn't be necessary here.

final Set<TopicPartition> partitions = new HashSet<>();
for (final String topic : topics) {
    for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
}

for (final TopicPartition tp : partitions) {
    final long offset = consumer.position(tp);
    committedOffsets.put(tp, offset);
}{code}
Here is the exception:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer.
   at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
   at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
   at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
   at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
   at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
 

As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated.

It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)