You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Phil Steitz <ph...@gmail.com> on 2015/09/08 23:59:43 UTC

New consumer subscribe then seek

I have been experimenting with the KafkaConsumer currently in
development [1].  Sorry if this should be a question for the user
list, but I am not sure if what I am seeing is something not working
yet or if I am misunderstanding the API.  If I use
KafkaConsumer#subscribe to subscribe to a topic and then try to use
seek(TopicPartion, offset) to position the consumer, I get an
IllegalStateException with message "No current assignment for
partition ...."  If I use assign instead to connect to the topic,
things work fine.  I can see why this is by looking at the
SubscriptionState code which is throwing the ISE because
SubscriptionState#seek expects to find an assignment, but
KafkaConsumer#subscribe does not make any.

I know this is unreleased code and I am not looking for help here -
actually more like looking *to* help but just learning the code. 
Happy to open a ticket with a test case if that will help or a patch
to the javadoc if I am misunderstanding the API and it can be made
clearer.

Thanks!

Phil

[1] ff189fa05ccdacac100f3d15d167dcbe561f57a6


Re: New consumer subscribe then seek

Posted by Phil Steitz <ph...@gmail.com>.
On 9/8/15 6:58 PM, Jason Gustafson wrote:
> Hey Phil,
>
> You've stumbled onto one of the tricky aspects of the new consumer that
> we've been talking about recently. KafkaConsumer.subscribe() is
> asynchronous in the sense that it will return before partitions have been
> assigned. We could make it synchronous, but we wouldn't be able to
> guarantee how long the assignment would be active since other members of
> the group or metadata changes can cause the coordinator to rebalance the
> assignment. The best place to perform a seek would probably be in the
> rebalance callback, which can be passed through the alternative subscribe
> API. The code might look something like this:
>
> consumer.subscribe(topics, new RebalanceListener() {
>   void onPartitionsAssigned(List<TopicPartition> partitions) {
>     // seek to the initial offset for the assigned partitions here
>   }
>   void onPartitionsRevoked(List<TopicPartition> partitions) {
>     // commit offsets if you need to
>   }
> });
>
> while (true) {
>   ConsumerRecords records = consumer.poll(100);
>   // do stuff with records
> }
>
> Does that make sense?

Yes, this makes sense.  Thanks!

Phil
>
>
> Thanks,
> Jason
>
>
> On Tue, Sep 8, 2015 at 2:59 PM, Phil Steitz <ph...@gmail.com> wrote:
>
>> I have been experimenting with the KafkaConsumer currently in
>> development [1].  Sorry if this should be a question for the user
>> list, but I am not sure if what I am seeing is something not working
>> yet or if I am misunderstanding the API.  If I use
>> KafkaConsumer#subscribe to subscribe to a topic and then try to use
>> seek(TopicPartion, offset) to position the consumer, I get an
>> IllegalStateException with message "No current assignment for
>> partition ...."  If I use assign instead to connect to the topic,
>> things work fine.  I can see why this is by looking at the
>> SubscriptionState code which is throwing the ISE because
>> SubscriptionState#seek expects to find an assignment, but
>> KafkaConsumer#subscribe does not make any.
>>
>> I know this is unreleased code and I am not looking for help here -
>> actually more like looking *to* help but just learning the code.
>> Happy to open a ticket with a test case if that will help or a patch
>> to the javadoc if I am misunderstanding the API and it can be made
>> clearer.
>>
>> Thanks!
>>
>> Phil
>>
>> [1] ff189fa05ccdacac100f3d15d167dcbe561f57a6
>>
>>



Re: New consumer subscribe then seek

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Phil,

You've stumbled onto one of the tricky aspects of the new consumer that
we've been talking about recently. KafkaConsumer.subscribe() is
asynchronous in the sense that it will return before partitions have been
assigned. We could make it synchronous, but we wouldn't be able to
guarantee how long the assignment would be active since other members of
the group or metadata changes can cause the coordinator to rebalance the
assignment. The best place to perform a seek would probably be in the
rebalance callback, which can be passed through the alternative subscribe
API. The code might look something like this:

consumer.subscribe(topics, new RebalanceListener() {
  void onPartitionsAssigned(List<TopicPartition> partitions) {
    // seek to the initial offset for the assigned partitions here
  }
  void onPartitionsRevoked(List<TopicPartition> partitions) {
    // commit offsets if you need to
  }
});

while (true) {
  ConsumerRecords records = consumer.poll(100);
  // do stuff with records
}

Does that make sense?


Thanks,
Jason


On Tue, Sep 8, 2015 at 2:59 PM, Phil Steitz <ph...@gmail.com> wrote:

> I have been experimenting with the KafkaConsumer currently in
> development [1].  Sorry if this should be a question for the user
> list, but I am not sure if what I am seeing is something not working
> yet or if I am misunderstanding the API.  If I use
> KafkaConsumer#subscribe to subscribe to a topic and then try to use
> seek(TopicPartion, offset) to position the consumer, I get an
> IllegalStateException with message "No current assignment for
> partition ...."  If I use assign instead to connect to the topic,
> things work fine.  I can see why this is by looking at the
> SubscriptionState code which is throwing the ISE because
> SubscriptionState#seek expects to find an assignment, but
> KafkaConsumer#subscribe does not make any.
>
> I know this is unreleased code and I am not looking for help here -
> actually more like looking *to* help but just learning the code.
> Happy to open a ticket with a test case if that will help or a patch
> to the javadoc if I am misunderstanding the API and it can be made
> clearer.
>
> Thanks!
>
> Phil
>
> [1] ff189fa05ccdacac100f3d15d167dcbe561f57a6
>
>