You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Simon Souter <si...@cakesolutions.net> on 2016/07/12 09:57:24 UTC

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

 Hi,

An issue I have regarding rebalancing, is that a call to poll() triggers
the JoinGroupRequest when rebalancing is in process.  In cases where a
consumer is streaming more than a single batch at a time, there is no
opportunity to attempt to flush any consumed batches through prior to the
rebalance completing.  If onPartitionsRevoked would be called via a
background thread, or an alive() call, there would be an opportunity for a
client to hold off from calling poll, until downstream messages are flushed
prior to calling poll again to trigger the Join and onPartitionsAssigned.

The current assumption appears to be that a call to poll() indicates that
there are no more in-flight messages.  Attempting to decouple consumer and
processor threads or the streaming of multiple batches results in
unavoidable redeliveries during a rebalance.

Regards

Simon Souter

https://github.com/cakesolutions/scala-kafka-client


-- 

*Simon Souter*

Software Engineer - Team Lead
Cake Solutions Limited


Find out more about The Art of Possible <http://www.cakesolutions.net/>

Overview videos <http://www.cakesolutions.net/software-delivery> - Check
out our wide range of services

Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about the
exciting technical problems we are solving

Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with white
papers, events, user group updates and other snippets of wisdom

T: 0845 6171200

*T:* (from outside UK): +44 (0)161 443 2355


*simons@cakesolutions.net <si...@cakesolutions.net>*

www.cakesolutions.net

Company registered in UK, No. 4184567

If you have received this e-mail in error please accept our apologies,
destroy it immediately and it would be greatly appreciated if you notified
the sender. It is your responsibility to protect your system from viruses
and any other harmful code or device. We try to eliminate them from e-mails
and attachments; but we accept no liability for any which remain. We may
monitor or access any or all e-mails sent to us.

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Simon,

Yeah, that makes sense. The current workaround for this kind of use case is
to use the pause/resume APIs in order to suspend fetching. This basically
lets you use poll() as a heartbeat() API, but you need a separate thread
for the poll loop that and you still have to be able to flush any pending
messages once the onPartitionsRevoked() callback gets triggered. This is a
basic limitation since the consumer itself does not control when the group
needs to rebalance. In general, I agree that we could probably use some
improvements to make async usage a little nicer to work with. Although we
rejected some of those options for this KIP, we tried to leave the door
open to reconsider them in the future.

Thanks,
Jason



On Tue, Jul 19, 2016 at 4:29 AM, Simon Souter <si...@cakesolutions.net>
wrote:

>  Hi Jason,
>
> Thanks for the response. The additional timeout is certainly a welcome
> addition.  The issue with the onPartitionsRevoked() mechanism is that it
> is driven through a call to poll() which works well when consuming from a
> single thread in a loop.  We consume via a event driven wrapper around the
> Java client that decouples polling from message processing and can have
> more than one batch in-flight through an async pipeline.  It is not easy to
> orchestrate an attempt the flush and commit of in-flight messages during a
> call to onPartitionsRevoked in this case.  Some mechanism to allow a client
> to indicate a flush is complete and is ready to rejoin that is not inferred
> from a call to poll() would help.
>
> Maybe this is out of scope of this KIP as described, but some of the
> rejected alternatives would appear to address the async use case:
>
> *Add a separate API the user can call to indicate liveness*
> This would be preferable from my point of view.  Our calls to the Kafka
> driver and driven though an Akka message dispatcher (lightweight thread).
> We can drive hundreds of Kafka Consumers from a handful of threads.  Adding
> a mandatory blocking thread to consumer would be a step backwards from our
> point of view.  It would be desirable to provide both options.
>
> *Move rebalancing to the background thread instead of heartbeats only?*
> This would be the ideal solution to support asynchronous event driven
> streaming from the driver.  It would be great to be able to optionally
> handle rebalance events in the current way or choose to handle them in via
> health check's thread with the ability to specify precisely when to rejoin.
>
> It appears that the Kafka contributors are aware of these concerns as they
> have been discussed on various wikis and mailing lists, not to mention
> being raised by the Kafka streams and Akka Reactive Kafka devs.  I would
> love to see a KIP to tackle async streaming use cases specifically.  We
> have already managed to provide a pretty solid async wrapper around the
> driver as-is, although there is some pretty tricky logic required to do so
> based on the current interface and in some cases we cant totally avoid
> redeliveries when rebalancing.
>
> Regards,
>
> *Simon* *Souter*
> https://github.com/cakesolutions/scala-kafka-client
>
> On 19 July 2016 at 02:22, Jason Gustafson <ja...@confluent.io> wrote:
>
> > Hey Simon,
> >
> > Sorry for the late response. The onPartitionsRevoked() hook is called
> > before the rebalance begins (that is, before the JoinGroup is sent) and
> is
> > intended to be used to flush uncommitted data and to commit corresponding
> > offsets. One of the main purposes of the KIP is to decouple the time
> > allowed to complete this from the session timeout, which now is used only
> > to detect failed or unreachable processes. This should give you more time
> > in onPartitionsRevoked() to cleanup existing state without sacrificing
> > failure detection. Does that make sense?
> >
> > Thanks,
> > Jason
> >
> > On Tue, Jul 12, 2016 at 2:57 AM, Simon Souter <si...@cakesolutions.net>
> > wrote:
> >
> > >  Hi,
> > >
> > > An issue I have regarding rebalancing, is that a call to poll()
> triggers
> > > the JoinGroupRequest when rebalancing is in process.  In cases where a
> > > consumer is streaming more than a single batch at a time, there is no
> > > opportunity to attempt to flush any consumed batches through prior to
> the
> > > rebalance completing.  If onPartitionsRevoked would be called via a
> > > background thread, or an alive() call, there would be an opportunity
> for
> > a
> > > client to hold off from calling poll, until downstream messages are
> > flushed
> > > prior to calling poll again to trigger the Join and
> onPartitionsAssigned.
> > >
> > > The current assumption appears to be that a call to poll() indicates
> that
> > > there are no more in-flight messages.  Attempting to decouple consumer
> > and
> > > processor threads or the streaming of multiple batches results in
> > > unavoidable redeliveries during a rebalance.
> > >
> > > Regards
> > >
> > > Simon Souter
> > >
> > > https://github.com/cakesolutions/scala-kafka-client
> > >
> > >
> > > --
> > >
> > > *Simon Souter*
> > >
> > > Software Engineer - Team Lead
> > > Cake Solutions Limited
> > >
> > >
> > > Find out more about The Art of Possible <http://www.cakesolutions.net/
> >
> > >
> > > Overview videos <http://www.cakesolutions.net/software-delivery> -
> Check
> > > out our wide range of services
> > >
> > > Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about
> > the
> > > exciting technical problems we are solving
> > >
> > > Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with
> white
> > > papers, events, user group updates and other snippets of wisdom
> > >
> > > T: 0845 6171200
> > >
> > > *T:* (from outside UK): +44 (0)161 443 2355
> > >
> > >
> > > *simons@cakesolutions.net <si...@cakesolutions.net>*
> > >
> > > www.cakesolutions.net
> > >
> > > Company registered in UK, No. 4184567
> > >
> > > If you have received this e-mail in error please accept our apologies,
> > > destroy it immediately and it would be greatly appreciated if you
> > notified
> > > the sender. It is your responsibility to protect your system from
> viruses
> > > and any other harmful code or device. We try to eliminate them from
> > e-mails
> > > and attachments; but we accept no liability for any which remain. We
> may
> > > monitor or access any or all e-mails sent to us.
> > >
> >
>
>
>
> --
>
> *Simon Souter*
>
> Software Engineer - Team Lead
> Cake Solutions Limited
>
>
> Find out more about The Art of Possible <http://www.cakesolutions.net/>
>
> Overview videos <http://www.cakesolutions.net/software-delivery> - Check
> out our wide range of services
>
> Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about the
> exciting technical problems we are solving
>
> Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with white
> papers, events, user group updates and other snippets of wisdom
>
> T: 0845 6171200
>
> *T:* (from outside UK): +44 (0)161 443 2355
>
>
> *simons@cakesolutions.net <si...@cakesolutions.net>*
>
> www.cakesolutions.net
>
> Company registered in UK, No. 4184567
>
> If you have received this e-mail in error please accept our apologies,
> destroy it immediately and it would be greatly appreciated if you notified
> the sender. It is your responsibility to protect your system from viruses
> and any other harmful code or device. We try to eliminate them from e-mails
> and attachments; but we accept no liability for any which remain. We may
> monitor or access any or all e-mails sent to us.
>

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

Posted by Simon Souter <si...@cakesolutions.net>.
 Hi Jason,

Thanks for the response. The additional timeout is certainly a welcome
addition.  The issue with the onPartitionsRevoked() mechanism is that it
is driven through a call to poll() which works well when consuming from a
single thread in a loop.  We consume via a event driven wrapper around the
Java client that decouples polling from message processing and can have
more than one batch in-flight through an async pipeline.  It is not easy to
orchestrate an attempt the flush and commit of in-flight messages during a
call to onPartitionsRevoked in this case.  Some mechanism to allow a client
to indicate a flush is complete and is ready to rejoin that is not inferred
from a call to poll() would help.

Maybe this is out of scope of this KIP as described, but some of the
rejected alternatives would appear to address the async use case:

*Add a separate API the user can call to indicate liveness*
This would be preferable from my point of view.  Our calls to the Kafka
driver and driven though an Akka message dispatcher (lightweight thread).
We can drive hundreds of Kafka Consumers from a handful of threads.  Adding
a mandatory blocking thread to consumer would be a step backwards from our
point of view.  It would be desirable to provide both options.

*Move rebalancing to the background thread instead of heartbeats only?*
This would be the ideal solution to support asynchronous event driven
streaming from the driver.  It would be great to be able to optionally
handle rebalance events in the current way or choose to handle them in via
health check's thread with the ability to specify precisely when to rejoin.

It appears that the Kafka contributors are aware of these concerns as they
have been discussed on various wikis and mailing lists, not to mention
being raised by the Kafka streams and Akka Reactive Kafka devs.  I would
love to see a KIP to tackle async streaming use cases specifically.  We
have already managed to provide a pretty solid async wrapper around the
driver as-is, although there is some pretty tricky logic required to do so
based on the current interface and in some cases we cant totally avoid
redeliveries when rebalancing.

Regards,

*Simon* *Souter*
https://github.com/cakesolutions/scala-kafka-client

On 19 July 2016 at 02:22, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Simon,
>
> Sorry for the late response. The onPartitionsRevoked() hook is called
> before the rebalance begins (that is, before the JoinGroup is sent) and is
> intended to be used to flush uncommitted data and to commit corresponding
> offsets. One of the main purposes of the KIP is to decouple the time
> allowed to complete this from the session timeout, which now is used only
> to detect failed or unreachable processes. This should give you more time
> in onPartitionsRevoked() to cleanup existing state without sacrificing
> failure detection. Does that make sense?
>
> Thanks,
> Jason
>
> On Tue, Jul 12, 2016 at 2:57 AM, Simon Souter <si...@cakesolutions.net>
> wrote:
>
> >  Hi,
> >
> > An issue I have regarding rebalancing, is that a call to poll() triggers
> > the JoinGroupRequest when rebalancing is in process.  In cases where a
> > consumer is streaming more than a single batch at a time, there is no
> > opportunity to attempt to flush any consumed batches through prior to the
> > rebalance completing.  If onPartitionsRevoked would be called via a
> > background thread, or an alive() call, there would be an opportunity for
> a
> > client to hold off from calling poll, until downstream messages are
> flushed
> > prior to calling poll again to trigger the Join and onPartitionsAssigned.
> >
> > The current assumption appears to be that a call to poll() indicates that
> > there are no more in-flight messages.  Attempting to decouple consumer
> and
> > processor threads or the streaming of multiple batches results in
> > unavoidable redeliveries during a rebalance.
> >
> > Regards
> >
> > Simon Souter
> >
> > https://github.com/cakesolutions/scala-kafka-client
> >
> >
> > --
> >
> > *Simon Souter*
> >
> > Software Engineer - Team Lead
> > Cake Solutions Limited
> >
> >
> > Find out more about The Art of Possible <http://www.cakesolutions.net/>
> >
> > Overview videos <http://www.cakesolutions.net/software-delivery> - Check
> > out our wide range of services
> >
> > Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about
> the
> > exciting technical problems we are solving
> >
> > Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with white
> > papers, events, user group updates and other snippets of wisdom
> >
> > T: 0845 6171200
> >
> > *T:* (from outside UK): +44 (0)161 443 2355
> >
> >
> > *simons@cakesolutions.net <si...@cakesolutions.net>*
> >
> > www.cakesolutions.net
> >
> > Company registered in UK, No. 4184567
> >
> > If you have received this e-mail in error please accept our apologies,
> > destroy it immediately and it would be greatly appreciated if you
> notified
> > the sender. It is your responsibility to protect your system from viruses
> > and any other harmful code or device. We try to eliminate them from
> e-mails
> > and attachments; but we accept no liability for any which remain. We may
> > monitor or access any or all e-mails sent to us.
> >
>



-- 

*Simon Souter*

Software Engineer - Team Lead
Cake Solutions Limited


Find out more about The Art of Possible <http://www.cakesolutions.net/>

Overview videos <http://www.cakesolutions.net/software-delivery> - Check
out our wide range of services

Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about the
exciting technical problems we are solving

Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with white
papers, events, user group updates and other snippets of wisdom

T: 0845 6171200

*T:* (from outside UK): +44 (0)161 443 2355


*simons@cakesolutions.net <si...@cakesolutions.net>*

www.cakesolutions.net

Company registered in UK, No. 4184567

If you have received this e-mail in error please accept our apologies,
destroy it immediately and it would be greatly appreciated if you notified
the sender. It is your responsibility to protect your system from viruses
and any other harmful code or device. We try to eliminate them from e-mails
and attachments; but we accept no liability for any which remain. We may
monitor or access any or all e-mails sent to us.

Re: [DISCUSS] KIP-62: Allow consumer to send heartbeats from a background thread

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Simon,

Sorry for the late response. The onPartitionsRevoked() hook is called
before the rebalance begins (that is, before the JoinGroup is sent) and is
intended to be used to flush uncommitted data and to commit corresponding
offsets. One of the main purposes of the KIP is to decouple the time
allowed to complete this from the session timeout, which now is used only
to detect failed or unreachable processes. This should give you more time
in onPartitionsRevoked() to cleanup existing state without sacrificing
failure detection. Does that make sense?

Thanks,
Jason

On Tue, Jul 12, 2016 at 2:57 AM, Simon Souter <si...@cakesolutions.net>
wrote:

>  Hi,
>
> An issue I have regarding rebalancing, is that a call to poll() triggers
> the JoinGroupRequest when rebalancing is in process.  In cases where a
> consumer is streaming more than a single batch at a time, there is no
> opportunity to attempt to flush any consumed batches through prior to the
> rebalance completing.  If onPartitionsRevoked would be called via a
> background thread, or an alive() call, there would be an opportunity for a
> client to hold off from calling poll, until downstream messages are flushed
> prior to calling poll again to trigger the Join and onPartitionsAssigned.
>
> The current assumption appears to be that a call to poll() indicates that
> there are no more in-flight messages.  Attempting to decouple consumer and
> processor threads or the streaming of multiple batches results in
> unavoidable redeliveries during a rebalance.
>
> Regards
>
> Simon Souter
>
> https://github.com/cakesolutions/scala-kafka-client
>
>
> --
>
> *Simon Souter*
>
> Software Engineer - Team Lead
> Cake Solutions Limited
>
>
> Find out more about The Art of Possible <http://www.cakesolutions.net/>
>
> Overview videos <http://www.cakesolutions.net/software-delivery> - Check
> out our wide range of services
>
> Cake’s blog  <http://www.cakesolutions.net/teamblogs>- Read all about the
> exciting technical problems we are solving
>
> Twitter <https://twitter.com/cakesolutions> - Keep up-to-date with white
> papers, events, user group updates and other snippets of wisdom
>
> T: 0845 6171200
>
> *T:* (from outside UK): +44 (0)161 443 2355
>
>
> *simons@cakesolutions.net <si...@cakesolutions.net>*
>
> www.cakesolutions.net
>
> Company registered in UK, No. 4184567
>
> If you have received this e-mail in error please accept our apologies,
> destroy it immediately and it would be greatly appreciated if you notified
> the sender. It is your responsibility to protect your system from viruses
> and any other harmful code or device. We try to eliminate them from e-mails
> and attachments; but we accept no liability for any which remain. We may
> monitor or access any or all e-mails sent to us.
>