You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Franco Giacosa <fg...@gmail.com> on 2016/01/15 12:26:13 UTC

commitSync CommitFailedException

Hi,

on the documentation for commitSync it says the following about the
CommitFailedException

 * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
commit failed and cannot be retried.
  * This can only occur if you are using automatic group management with
{@link #subscribe(List)}

1. Can someone explain me what automatic group management means? and if its
just the consumer group, why is that the commit may fail?

2.In a Consumer Group, each consumer has 1 or many partitions assigned
right? So if the consumers are not sharing partitions why is that the
commit may failed and cannot be retried?

3.This also happens in commitAsync right? I don't see it on the
documentation so thats why I am asking.

Thanks,
Franco.

Re: commitSync CommitFailedException

Posted by Franco Giacosa <fg...@gmail.com>.
Thanks for the answer Jason.

Has the consumer any way to rejoin the Consumer Group?

2016-01-19 18:27 GMT+01:00 Jason Gustafson <ja...@confluent.io>:

> Hey Franco,
>
> This time I'll answer briefly ;)
>
> 1) Heartbeats also get invoked when you call another blocking operation
> such as commitSync().
> 2) If all consumers in the group die, the coordinator doesn't really do
> anything other than clean up some group state. In particular, it does not
> remove offset commits.
>
> -Jason
>
> On Sun, Jan 17, 2016 at 11:03 AM, Franco Giacosa <fg...@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > Thanks for the detailed explanation, I hope that KIP-41 gets added fast.
> >
> > A few questions:
> > (1) The only way to send a heartbeat is to poll? if I poll with do
> poll(0)
> > does it renew the token?
> > (2) What happens to the coordinator if all consumers die?
> >
> > Franco.
> >
> >
> >
> >
> > 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>:
> >
> > > Hi Franco,
> > >
> > > The new consumer combines the functionality of the older simple and
> > > high-level consumers. When used in simple mode, you have to assign the
> > > partitions that you want to read from using assign(). In this case, the
> > > consumer works alone and not in a group. Alternatively, if you use the
> > > subscribe() API, then a special Kafka broker (known as the coordinator)
> > > will coordinate all consumers with the same groupId to distribute the
> > > partitions of all subscribed topics among them. Each partition will be
> > > assigned automatically to exactly one consumer in the group. This is
> what
> > > is meant by automatic group management.
> > >
> > > Group management in the new consumer basically works like this: all
> > members
> > > send a request to the coordinator indicating that they need to join the
> > > group. Once joined, the consumer begins sending heartbeats to the
> > > coordinator. If no heartbeat is received before the expiration of the
> > > session timeout (configured with session.timeout.ms), then the
> > coordinator
> > > marks the consumer dead and asks all other consumers in the group to
> > rejoin
> > > (so that partitions can be reassigned). I've skipped some details, but
> > > those are the important points.
> > >
> > > Now, a relatively controversial feature of the new consumer is its
> > > single-threaded design. Instead of sending fetches and heartbeats in a
> > > background thread, all IO is done in the foreground when the user calls
> > > poll(). This implies in particular, that poll() must be called at least
> > as
> > > frequently as the session timeout in order to send heartbeats. If the
> > > session timeout expires between consecutive calls to poll(), then the
> > > coordinator will think the consumer is dead and it will reassign its
> > > partitions to other members. If you then try to commit offsets from
> that
> > > consumer, the coordinator will reject the request since it has already
> > > kicked it out of the group. When this happens, KafkaConsumer throws
> > > CommitFailedException since there is no way that the commit can
> succeed.
> > > And yes, you can see this in commitAsync as well (in the
> > > OffsetCommitCallback).
> > >
> > > Of course the main reason why the consumer would fail to poll often
> > enough
> > > is that message processing is also done in the same thread. If it takes
> > > longer than the session timeout to handle the previous batch of
> messages,
> > > then the consumer gets kicked out. It may seem a little strange to have
> > > both message processing and heartbeating in the same thread, but
> ensuring
> > > consumer liveness was one of the design objectives. If heartbeats were
> > sent
> > > from a background thread, then the message processor could die while
> the
> > > hearbeat thread remained active. In this case, the consumer would hold
> > onto
> > > the partitions indefinitely until it had been shutdown.
> > >
> > > So what can you do if you see this? First, confirm that it's actually
> > > happening. Record the interval between subsequent calls to poll() and
> > check
> > > if the CommitFailedException is thrown after the session timeout has
> > > expired (the default is 30s by the way). If it is, then there are
> > basically
> > > two choices for dealing with this problem at the moment: 1) increase
> the
> > > session timeout to give more time for message processing, and 2) move
> the
> > > processing to another thread. If you do the latter of these, you have
> to
> > > careful about coordinating commits with the message processing thread
> and
> > > how to notify it of rebalances (we have a ConsumerRebalanceListener
> that
> > > you can work with for this purpose). You can also try to tweak
> > > "max.partition.fetch.bytes," but this can be dangerous if you don't
> know
> > > the maximum size of the messages you have to handle.
> > >
> > > And for what it's worth, we're planning to add a new configuration
> > > "max.poll.records" to set an upper limit on the number of messages
> > returned
> > > from poll() (assuming that KIP-41 is approved). This can make it easier
> > to
> > > limit the message processing time so that there is less risk of running
> > > over the session timeout.
> > >
> > > Hope that helps!
> > >
> > > -Jason
> > >
> > >
> > >
> > >
> > > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fg...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > on the documentation for commitSync it says the following about the
> > > > CommitFailedException
> > > >
> > > >  * @throws org.apache.kafka.clients.consumer.CommitFailedException if
> > the
> > > > commit failed and cannot be retried.
> > > >   * This can only occur if you are using automatic group management
> > with
> > > > {@link #subscribe(List)}
> > > >
> > > > 1. Can someone explain me what automatic group management means? and
> if
> > > its
> > > > just the consumer group, why is that the commit may fail?
> > > >
> > > > 2.In a Consumer Group, each consumer has 1 or many partitions
> assigned
> > > > right? So if the consumers are not sharing partitions why is that the
> > > > commit may failed and cannot be retried?
> > > >
> > > > 3.This also happens in commitAsync right? I don't see it on the
> > > > documentation so thats why I am asking.
> > > >
> > > > Thanks,
> > > > Franco.
> > > >
> > >
> >
>

Re: commitSync CommitFailedException

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Franco,

This time I'll answer briefly ;)

1) Heartbeats also get invoked when you call another blocking operation
such as commitSync().
2) If all consumers in the group die, the coordinator doesn't really do
anything other than clean up some group state. In particular, it does not
remove offset commits.

-Jason

On Sun, Jan 17, 2016 at 11:03 AM, Franco Giacosa <fg...@gmail.com> wrote:

> Hi Jason,
>
> Thanks for the detailed explanation, I hope that KIP-41 gets added fast.
>
> A few questions:
> (1) The only way to send a heartbeat is to poll? if I poll with do poll(0)
> does it renew the token?
> (2) What happens to the coordinator if all consumers die?
>
> Franco.
>
>
>
>
> 2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>:
>
> > Hi Franco,
> >
> > The new consumer combines the functionality of the older simple and
> > high-level consumers. When used in simple mode, you have to assign the
> > partitions that you want to read from using assign(). In this case, the
> > consumer works alone and not in a group. Alternatively, if you use the
> > subscribe() API, then a special Kafka broker (known as the coordinator)
> > will coordinate all consumers with the same groupId to distribute the
> > partitions of all subscribed topics among them. Each partition will be
> > assigned automatically to exactly one consumer in the group. This is what
> > is meant by automatic group management.
> >
> > Group management in the new consumer basically works like this: all
> members
> > send a request to the coordinator indicating that they need to join the
> > group. Once joined, the consumer begins sending heartbeats to the
> > coordinator. If no heartbeat is received before the expiration of the
> > session timeout (configured with session.timeout.ms), then the
> coordinator
> > marks the consumer dead and asks all other consumers in the group to
> rejoin
> > (so that partitions can be reassigned). I've skipped some details, but
> > those are the important points.
> >
> > Now, a relatively controversial feature of the new consumer is its
> > single-threaded design. Instead of sending fetches and heartbeats in a
> > background thread, all IO is done in the foreground when the user calls
> > poll(). This implies in particular, that poll() must be called at least
> as
> > frequently as the session timeout in order to send heartbeats. If the
> > session timeout expires between consecutive calls to poll(), then the
> > coordinator will think the consumer is dead and it will reassign its
> > partitions to other members. If you then try to commit offsets from that
> > consumer, the coordinator will reject the request since it has already
> > kicked it out of the group. When this happens, KafkaConsumer throws
> > CommitFailedException since there is no way that the commit can succeed.
> > And yes, you can see this in commitAsync as well (in the
> > OffsetCommitCallback).
> >
> > Of course the main reason why the consumer would fail to poll often
> enough
> > is that message processing is also done in the same thread. If it takes
> > longer than the session timeout to handle the previous batch of messages,
> > then the consumer gets kicked out. It may seem a little strange to have
> > both message processing and heartbeating in the same thread, but ensuring
> > consumer liveness was one of the design objectives. If heartbeats were
> sent
> > from a background thread, then the message processor could die while the
> > hearbeat thread remained active. In this case, the consumer would hold
> onto
> > the partitions indefinitely until it had been shutdown.
> >
> > So what can you do if you see this? First, confirm that it's actually
> > happening. Record the interval between subsequent calls to poll() and
> check
> > if the CommitFailedException is thrown after the session timeout has
> > expired (the default is 30s by the way). If it is, then there are
> basically
> > two choices for dealing with this problem at the moment: 1) increase the
> > session timeout to give more time for message processing, and 2) move the
> > processing to another thread. If you do the latter of these, you have to
> > careful about coordinating commits with the message processing thread and
> > how to notify it of rebalances (we have a ConsumerRebalanceListener that
> > you can work with for this purpose). You can also try to tweak
> > "max.partition.fetch.bytes," but this can be dangerous if you don't know
> > the maximum size of the messages you have to handle.
> >
> > And for what it's worth, we're planning to add a new configuration
> > "max.poll.records" to set an upper limit on the number of messages
> returned
> > from poll() (assuming that KIP-41 is approved). This can make it easier
> to
> > limit the message processing time so that there is less risk of running
> > over the session timeout.
> >
> > Hope that helps!
> >
> > -Jason
> >
> >
> >
> >
> > On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fg...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > on the documentation for commitSync it says the following about the
> > > CommitFailedException
> > >
> > >  * @throws org.apache.kafka.clients.consumer.CommitFailedException if
> the
> > > commit failed and cannot be retried.
> > >   * This can only occur if you are using automatic group management
> with
> > > {@link #subscribe(List)}
> > >
> > > 1. Can someone explain me what automatic group management means? and if
> > its
> > > just the consumer group, why is that the commit may fail?
> > >
> > > 2.In a Consumer Group, each consumer has 1 or many partitions assigned
> > > right? So if the consumers are not sharing partitions why is that the
> > > commit may failed and cannot be retried?
> > >
> > > 3.This also happens in commitAsync right? I don't see it on the
> > > documentation so thats why I am asking.
> > >
> > > Thanks,
> > > Franco.
> > >
> >
>

Re: commitSync CommitFailedException

Posted by Franco Giacosa <fg...@gmail.com>.
Hi Jason,

Thanks for the detailed explanation, I hope that KIP-41 gets added fast.

A few questions:
(1) The only way to send a heartbeat is to poll? if I poll with do poll(0)
does it renew the token?
(2) What happens to the coordinator if all consumers die?

Franco.




2016-01-15 19:30 GMT+01:00 Jason Gustafson <ja...@confluent.io>:

> Hi Franco,
>
> The new consumer combines the functionality of the older simple and
> high-level consumers. When used in simple mode, you have to assign the
> partitions that you want to read from using assign(). In this case, the
> consumer works alone and not in a group. Alternatively, if you use the
> subscribe() API, then a special Kafka broker (known as the coordinator)
> will coordinate all consumers with the same groupId to distribute the
> partitions of all subscribed topics among them. Each partition will be
> assigned automatically to exactly one consumer in the group. This is what
> is meant by automatic group management.
>
> Group management in the new consumer basically works like this: all members
> send a request to the coordinator indicating that they need to join the
> group. Once joined, the consumer begins sending heartbeats to the
> coordinator. If no heartbeat is received before the expiration of the
> session timeout (configured with session.timeout.ms), then the coordinator
> marks the consumer dead and asks all other consumers in the group to rejoin
> (so that partitions can be reassigned). I've skipped some details, but
> those are the important points.
>
> Now, a relatively controversial feature of the new consumer is its
> single-threaded design. Instead of sending fetches and heartbeats in a
> background thread, all IO is done in the foreground when the user calls
> poll(). This implies in particular, that poll() must be called at least as
> frequently as the session timeout in order to send heartbeats. If the
> session timeout expires between consecutive calls to poll(), then the
> coordinator will think the consumer is dead and it will reassign its
> partitions to other members. If you then try to commit offsets from that
> consumer, the coordinator will reject the request since it has already
> kicked it out of the group. When this happens, KafkaConsumer throws
> CommitFailedException since there is no way that the commit can succeed.
> And yes, you can see this in commitAsync as well (in the
> OffsetCommitCallback).
>
> Of course the main reason why the consumer would fail to poll often enough
> is that message processing is also done in the same thread. If it takes
> longer than the session timeout to handle the previous batch of messages,
> then the consumer gets kicked out. It may seem a little strange to have
> both message processing and heartbeating in the same thread, but ensuring
> consumer liveness was one of the design objectives. If heartbeats were sent
> from a background thread, then the message processor could die while the
> hearbeat thread remained active. In this case, the consumer would hold onto
> the partitions indefinitely until it had been shutdown.
>
> So what can you do if you see this? First, confirm that it's actually
> happening. Record the interval between subsequent calls to poll() and check
> if the CommitFailedException is thrown after the session timeout has
> expired (the default is 30s by the way). If it is, then there are basically
> two choices for dealing with this problem at the moment: 1) increase the
> session timeout to give more time for message processing, and 2) move the
> processing to another thread. If you do the latter of these, you have to
> careful about coordinating commits with the message processing thread and
> how to notify it of rebalances (we have a ConsumerRebalanceListener that
> you can work with for this purpose). You can also try to tweak
> "max.partition.fetch.bytes," but this can be dangerous if you don't know
> the maximum size of the messages you have to handle.
>
> And for what it's worth, we're planning to add a new configuration
> "max.poll.records" to set an upper limit on the number of messages returned
> from poll() (assuming that KIP-41 is approved). This can make it easier to
> limit the message processing time so that there is less risk of running
> over the session timeout.
>
> Hope that helps!
>
> -Jason
>
>
>
>
> On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fg...@gmail.com>
> wrote:
>
> > Hi,
> >
> > on the documentation for commitSync it says the following about the
> > CommitFailedException
> >
> >  * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
> > commit failed and cannot be retried.
> >   * This can only occur if you are using automatic group management with
> > {@link #subscribe(List)}
> >
> > 1. Can someone explain me what automatic group management means? and if
> its
> > just the consumer group, why is that the commit may fail?
> >
> > 2.In a Consumer Group, each consumer has 1 or many partitions assigned
> > right? So if the consumers are not sharing partitions why is that the
> > commit may failed and cannot be retried?
> >
> > 3.This also happens in commitAsync right? I don't see it on the
> > documentation so thats why I am asking.
> >
> > Thanks,
> > Franco.
> >
>

Re: commitSync CommitFailedException

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Franco,

The new consumer combines the functionality of the older simple and
high-level consumers. When used in simple mode, you have to assign the
partitions that you want to read from using assign(). In this case, the
consumer works alone and not in a group. Alternatively, if you use the
subscribe() API, then a special Kafka broker (known as the coordinator)
will coordinate all consumers with the same groupId to distribute the
partitions of all subscribed topics among them. Each partition will be
assigned automatically to exactly one consumer in the group. This is what
is meant by automatic group management.

Group management in the new consumer basically works like this: all members
send a request to the coordinator indicating that they need to join the
group. Once joined, the consumer begins sending heartbeats to the
coordinator. If no heartbeat is received before the expiration of the
session timeout (configured with session.timeout.ms), then the coordinator
marks the consumer dead and asks all other consumers in the group to rejoin
(so that partitions can be reassigned). I've skipped some details, but
those are the important points.

Now, a relatively controversial feature of the new consumer is its
single-threaded design. Instead of sending fetches and heartbeats in a
background thread, all IO is done in the foreground when the user calls
poll(). This implies in particular, that poll() must be called at least as
frequently as the session timeout in order to send heartbeats. If the
session timeout expires between consecutive calls to poll(), then the
coordinator will think the consumer is dead and it will reassign its
partitions to other members. If you then try to commit offsets from that
consumer, the coordinator will reject the request since it has already
kicked it out of the group. When this happens, KafkaConsumer throws
CommitFailedException since there is no way that the commit can succeed.
And yes, you can see this in commitAsync as well (in the
OffsetCommitCallback).

Of course the main reason why the consumer would fail to poll often enough
is that message processing is also done in the same thread. If it takes
longer than the session timeout to handle the previous batch of messages,
then the consumer gets kicked out. It may seem a little strange to have
both message processing and heartbeating in the same thread, but ensuring
consumer liveness was one of the design objectives. If heartbeats were sent
from a background thread, then the message processor could die while the
hearbeat thread remained active. In this case, the consumer would hold onto
the partitions indefinitely until it had been shutdown.

So what can you do if you see this? First, confirm that it's actually
happening. Record the interval between subsequent calls to poll() and check
if the CommitFailedException is thrown after the session timeout has
expired (the default is 30s by the way). If it is, then there are basically
two choices for dealing with this problem at the moment: 1) increase the
session timeout to give more time for message processing, and 2) move the
processing to another thread. If you do the latter of these, you have to
careful about coordinating commits with the message processing thread and
how to notify it of rebalances (we have a ConsumerRebalanceListener that
you can work with for this purpose). You can also try to tweak
"max.partition.fetch.bytes," but this can be dangerous if you don't know
the maximum size of the messages you have to handle.

And for what it's worth, we're planning to add a new configuration
"max.poll.records" to set an upper limit on the number of messages returned
from poll() (assuming that KIP-41 is approved). This can make it easier to
limit the message processing time so that there is less risk of running
over the session timeout.

Hope that helps!

-Jason




On Fri, Jan 15, 2016 at 3:26 AM, Franco Giacosa <fg...@gmail.com> wrote:

> Hi,
>
> on the documentation for commitSync it says the following about the
> CommitFailedException
>
>  * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
> commit failed and cannot be retried.
>   * This can only occur if you are using automatic group management with
> {@link #subscribe(List)}
>
> 1. Can someone explain me what automatic group management means? and if its
> just the consumer group, why is that the commit may fail?
>
> 2.In a Consumer Group, each consumer has 1 or many partitions assigned
> right? So if the consumers are not sharing partitions why is that the
> commit may failed and cannot be retried?
>
> 3.This also happens in commitAsync right? I don't see it on the
> documentation so thats why I am asking.
>
> Thanks,
> Franco.
>