You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by craig w <co...@gmail.com> on 2016/02/24 16:10:32 UTC

No current assignment for partition within ConsumerRebalanceListener

I am using the Java Kafka 0.9 client. When I subscribe to a topic I provide
a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
doing this:

partitions.foreach( (tp: TopicPartition) => {
    consumer.seek(tp, consumer.position(tp))
})

However, sometimes I end up an infinite loop with IllegalStateExceptions
being thrown [1]:

No current assignment for partition <topic-partition>

I thought it was safe to seek because the consumer should have been
assigned when this method is invoked. Am I missing something?

For what it's worth, I am manually committing offsets (using commitSync).

[1] -
https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228

Re: No current assignment for partition within ConsumerRebalanceListener

Posted by Jason Gustafson <ja...@confluent.io>.
You shouldn't need to seek at all. When you commit offsets for a partition,
they are stored in Kafka and become available to any member which is
assigned that partition. The default behavior after every rebalance is to
lookup the last committed offset for each assigned position and seek to
it. This allows whichever consumer is assigned the partition to pick up
from wherever the last consumer left off.

-Jason

On Wed, Feb 24, 2016 at 10:46 AM, craig w <co...@gmail.com> wrote:

> So would I seekToEnd in that case? I can't seek to the last commit I
> manually comitted since I may not be assigned the same partitions as
> before.
>
> On Wed, Feb 24, 2016 at 1:44 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Sure, but in that case, the commits are still being stored in Kafka, so
> > resetting to the last committed position seems like what you want.
> >
> > -Jason
> >
> > On Wed, Feb 24, 2016 at 10:42 AM, craig w <co...@gmail.com> wrote:
> >
> > > In this case I am managing offsets manually. When a message is done
> being
> > > processed, I invoke "commitSync" passing it the map of commits to sync.
> > >
> > > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > I think the problem is the call to position() from within the
> callback.
> > > > When onAssigned() gets invoked, we don't have a position yet, so
> > calling
> > > > position() forces the consumer to request the last committed offset
> > which
> > > > evidently causes an infinite loop. It might be worth opening a JIRA
> for
> > > > this since we should definitely handle it more gracefully. That
> aside,
> > if
> > > > you are trying to seek to the last committed position, there is no
> need
> > > to
> > > > do it manually since this is what the consumer does automatically by
> > > > default. The only time you should need to use seek() in the
> > onAssigned()
> > > > callback is if you are managing the offsets yourself.
> > > >
> > > > -Jason
> > > >
> > > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <co...@gmail.com>
> wrote:
> > > >
> > > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > > > provide
> > > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I
> > am
> > > > > doing this:
> > > > >
> > > > > partitions.foreach( (tp: TopicPartition) => {
> > > > >     consumer.seek(tp, consumer.position(tp))
> > > > > })
> > > > >
> > > > > However, sometimes I end up an infinite loop with
> > > IllegalStateExceptions
> > > > > being thrown [1]:
> > > > >
> > > > > No current assignment for partition <topic-partition>
> > > > >
> > > > > I thought it was safe to seek because the consumer should have been
> > > > > assigned when this method is invoked. Am I missing something?
> > > > >
> > > > > For what it's worth, I am manually committing offsets (using
> > > commitSync).
> > > > >
> > > > > [1] -
> > > > >
> > > > >
> > > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > https://github.com/mindscratch
> > > https://www.google.com/+CraigWickesser
> > > https://twitter.com/mind_scratch
> > > https://twitter.com/craig_links
> > >
> >
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links
>

Re: No current assignment for partition within ConsumerRebalanceListener

Posted by craig w <co...@gmail.com>.
So would I seekToEnd in that case? I can't seek to the last commit I
manually comitted since I may not be assigned the same partitions as before.

On Wed, Feb 24, 2016 at 1:44 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Sure, but in that case, the commits are still being stored in Kafka, so
> resetting to the last committed position seems like what you want.
>
> -Jason
>
> On Wed, Feb 24, 2016 at 10:42 AM, craig w <co...@gmail.com> wrote:
>
> > In this case I am managing offsets manually. When a message is done being
> > processed, I invoke "commitSync" passing it the map of commits to sync.
> >
> > On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > I think the problem is the call to position() from within the callback.
> > > When onAssigned() gets invoked, we don't have a position yet, so
> calling
> > > position() forces the consumer to request the last committed offset
> which
> > > evidently causes an infinite loop. It might be worth opening a JIRA for
> > > this since we should definitely handle it more gracefully. That aside,
> if
> > > you are trying to seek to the last committed position, there is no need
> > to
> > > do it manually since this is what the consumer does automatically by
> > > default. The only time you should need to use seek() in the
> onAssigned()
> > > callback is if you are managing the offsets yourself.
> > >
> > > -Jason
> > >
> > > On Wed, Feb 24, 2016 at 7:10 AM, craig w <co...@gmail.com> wrote:
> > >
> > > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > > provide
> > > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I
> am
> > > > doing this:
> > > >
> > > > partitions.foreach( (tp: TopicPartition) => {
> > > >     consumer.seek(tp, consumer.position(tp))
> > > > })
> > > >
> > > > However, sometimes I end up an infinite loop with
> > IllegalStateExceptions
> > > > being thrown [1]:
> > > >
> > > > No current assignment for partition <topic-partition>
> > > >
> > > > I thought it was safe to seek because the consumer should have been
> > > > assigned when this method is invoked. Am I missing something?
> > > >
> > > > For what it's worth, I am manually committing offsets (using
> > commitSync).
> > > >
> > > > [1] -
> > > >
> > > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > > >
> > >
> >
> >
> >
> > --
> >
> > https://github.com/mindscratch
> > https://www.google.com/+CraigWickesser
> > https://twitter.com/mind_scratch
> > https://twitter.com/craig_links
> >
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Re: No current assignment for partition within ConsumerRebalanceListener

Posted by Jason Gustafson <ja...@confluent.io>.
Sure, but in that case, the commits are still being stored in Kafka, so
resetting to the last committed position seems like what you want.

-Jason

On Wed, Feb 24, 2016 at 10:42 AM, craig w <co...@gmail.com> wrote:

> In this case I am managing offsets manually. When a message is done being
> processed, I invoke "commitSync" passing it the map of commits to sync.
>
> On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > I think the problem is the call to position() from within the callback.
> > When onAssigned() gets invoked, we don't have a position yet, so calling
> > position() forces the consumer to request the last committed offset which
> > evidently causes an infinite loop. It might be worth opening a JIRA for
> > this since we should definitely handle it more gracefully. That aside, if
> > you are trying to seek to the last committed position, there is no need
> to
> > do it manually since this is what the consumer does automatically by
> > default. The only time you should need to use seek() in the onAssigned()
> > callback is if you are managing the offsets yourself.
> >
> > -Jason
> >
> > On Wed, Feb 24, 2016 at 7:10 AM, craig w <co...@gmail.com> wrote:
> >
> > > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> > provide
> > > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> > > doing this:
> > >
> > > partitions.foreach( (tp: TopicPartition) => {
> > >     consumer.seek(tp, consumer.position(tp))
> > > })
> > >
> > > However, sometimes I end up an infinite loop with
> IllegalStateExceptions
> > > being thrown [1]:
> > >
> > > No current assignment for partition <topic-partition>
> > >
> > > I thought it was safe to seek because the consumer should have been
> > > assigned when this method is invoked. Am I missing something?
> > >
> > > For what it's worth, I am manually committing offsets (using
> commitSync).
> > >
> > > [1] -
> > >
> > >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> > >
> >
>
>
>
> --
>
> https://github.com/mindscratch
> https://www.google.com/+CraigWickesser
> https://twitter.com/mind_scratch
> https://twitter.com/craig_links
>

Re: No current assignment for partition within ConsumerRebalanceListener

Posted by craig w <co...@gmail.com>.
In this case I am managing offsets manually. When a message is done being
processed, I invoke "commitSync" passing it the map of commits to sync.

On Wed, Feb 24, 2016 at 1:35 PM, Jason Gustafson <ja...@confluent.io> wrote:

> I think the problem is the call to position() from within the callback.
> When onAssigned() gets invoked, we don't have a position yet, so calling
> position() forces the consumer to request the last committed offset which
> evidently causes an infinite loop. It might be worth opening a JIRA for
> this since we should definitely handle it more gracefully. That aside, if
> you are trying to seek to the last committed position, there is no need to
> do it manually since this is what the consumer does automatically by
> default. The only time you should need to use seek() in the onAssigned()
> callback is if you are managing the offsets yourself.
>
> -Jason
>
> On Wed, Feb 24, 2016 at 7:10 AM, craig w <co...@gmail.com> wrote:
>
> > I am using the Java Kafka 0.9 client. When I subscribe to a topic I
> provide
> > a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> > doing this:
> >
> > partitions.foreach( (tp: TopicPartition) => {
> >     consumer.seek(tp, consumer.position(tp))
> > })
> >
> > However, sometimes I end up an infinite loop with IllegalStateExceptions
> > being thrown [1]:
> >
> > No current assignment for partition <topic-partition>
> >
> > I thought it was safe to seek because the consumer should have been
> > assigned when this method is invoked. Am I missing something?
> >
> > For what it's worth, I am manually committing offsets (using commitSync).
> >
> > [1] -
> >
> >
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
> >
>



-- 

https://github.com/mindscratch
https://www.google.com/+CraigWickesser
https://twitter.com/mind_scratch
https://twitter.com/craig_links

Re: No current assignment for partition within ConsumerRebalanceListener

Posted by Jason Gustafson <ja...@confluent.io>.
I think the problem is the call to position() from within the callback.
When onAssigned() gets invoked, we don't have a position yet, so calling
position() forces the consumer to request the last committed offset which
evidently causes an infinite loop. It might be worth opening a JIRA for
this since we should definitely handle it more gracefully. That aside, if
you are trying to seek to the last committed position, there is no need to
do it manually since this is what the consumer does automatically by
default. The only time you should need to use seek() in the onAssigned()
callback is if you are managing the offsets yourself.

-Jason

On Wed, Feb 24, 2016 at 7:10 AM, craig w <co...@gmail.com> wrote:

> I am using the Java Kafka 0.9 client. When I subscribe to a topic I provide
> a ConsumerRebalanceListener. In the "onPartitionsAssigned" method I am
> doing this:
>
> partitions.foreach( (tp: TopicPartition) => {
>     consumer.seek(tp, consumer.position(tp))
> })
>
> However, sometimes I end up an infinite loop with IllegalStateExceptions
> being thrown [1]:
>
> No current assignment for partition <topic-partition>
>
> I thought it was safe to seek because the consumer should have been
> assigned when this method is invoked. Am I missing something?
>
> For what it's worth, I am manually committing offsets (using commitSync).
>
> [1] -
>
> https://apache.googlesource.com/kafka/+/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#228
>