You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alexey Romanchuk <al...@gmail.com> on 2016/02/05 15:35:52 UTC

New client commitAsync problem

Hi all!

Right now I am working on reactive streams connector to kafka. I am using
new client and found strange behavior of commitAsync method which not
calling callbacks at all at some cases.

I found, that callback calling is a part of handling of incoming messages.
These messages are not fetching in background, but fetching during other
activity (like fetching from topic). In the other hand there is no way to
perform "blank" activity to fetch commit confirmation from Consumer.

Right now if I message processing is depended on commit confirmation it is
impossible to work in reactive way.

Here it is very small example of problem -
https://gist.github.com/13h3r/496e802afe65233b184a

My questions are:
- is it bug or design decision?
- if it is not bug how I can write reactive consumer?

Thanks!

Re: New client commitAsync problem

Posted by Alexey Romanchuk <al...@gmail.com>.
Hello Jason,

thank you for reply and explanation. Now it is very clear how it works now.

I have nothing to ask about current state of client, but have few ideas
about how it can be implemented/evolved.

1. Push the data received for paused partitions to user level API. Let user
decide what to do with this data. This allow us to not discard the data we
have already.

2. What do you think about separation of commands and data streams? With
such separation we can implement streaming data transfer and control it
with command stream.

Thanks!


On Sun, Feb 7, 2016 at 2:45 AM, Jason Gustafson <ja...@confluent.io> wrote:

> >
> > - is it cost something? is it any network activity to pause/resume?
>
>
> That is a great question. The pause() method sets an internal flag which
> tells the consumer not to send any more fetches. If a fetch was in progress
> and returned while the partition was still paused, then the fetched data
> will be discarded. This is significant because the consumer implements a
> prefetching optimization in order to pipeline fetching and message
> processing. Basically before the consumer returns a set of records, it will
> send the next fetch to the broker so that the next batch of messages will
> be available when the next poll() is called.
>
> Unfortunately, this prefetching strategy doesn't seem to play nicely with
> pause(), at least not as I've proposed to use it above. When poll() returns
> with new data for a partition, you would immediately call pause() and then
> begin processing the messages. After you finish, you would call
> commitAsync() to send the commit. But if the prefetch for the next records
> returns before the commit (which seems possible depending on the processing
> time and fetch configuration), then we'd have to discard the data. Then
> after the commit returns, the partition would be unpaused and we'd resend
> the fetch. In the worst case, the consumer would need to send each fetch
> twice, which sounds pretty bad.
>
> I'm actually not sure if the situation is quite this bad. It would be
> useful to do some testing to see if this double-fetching is a real problem
> in practice (or if I've just overthought it). If it is an issue, then there
> are some options to deal with it. For example, maybe it would make make
> sense to give users a configuration to turn off prefetching. That or we
> could hold onto the fetched records indefinitely under the assumption that
> the partition would be unpaused eventually.
>
>
> - there is no API to get information of current status of resume/pause for
> > client. Am I wrong? What about having such API?
>
>
> There was a patch to add a paused() API which would return the set of
> partitions currently paused. Is that what you had in mind? Let me see if I
> can help get that merged.
>
>
> Also, you told about "one option". Is it another?
>
>
> Haha, this is actually the only option that came to mind for what you're
> trying to do, but I said "one option" in case someone cleverer than I
> thought of another way.
>
> -Jason
>
> On Sat, Feb 6, 2016 at 7:34 AM, Alexey Romanchuk <
> alexey.romanchuk@gmail.com
> > wrote:
>
> > Hi Jason!
> >
> > Thanks for reply. I try to implement it and looks like it works. Few
> > moments about pause/resume:
> > - is it cost something? is it any network activity to pause/resume?
> > - there is no API to get information of current status of resume/pause
> for
> > client. Am I wrong? What about having such API?
> >
> > Also, you told about "one option". Is it another?
> >
> > Thanks!
> >
> > On Sat, Feb 6, 2016 at 6:13 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hey Alexey,
> > >
> > > The API of the new consumer is designed around an event loop in which
> all
> > > IO is driven by the poll() API. To make this work, you need to call
> > poll()
> > > in a loop (see the javadocs for examples). So in this example, when you
> > > call commitAsync(), the request is basically just queued up to be sent.
> > It
> > > won't actually be transmitted, however, until the next poll() is
> called.
> > > And it may not return until an even later call.
> > >
> > > If I understand correctly, the basic problem you're trying to figure
> out
> > is
> > > how you can do an asynchronous commit without continuing to receive
> > records
> > > for the associated partitions. One option would be to use the pause()
> API
> > > to suspend fetching from those partitions. So you could call
> > commitAsync()
> > > with the commits you need to send, then pause the respective
> partitions.
> > > After the commit returns, you could resume() the partitions in the
> commit
> > > callback, which will reenable fetching.
> > >
> > > Hope that helps!
> > >
> > > -Jason
> > >
> > >
> > >
> > > On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk <
> > > alexey.romanchuk@gmail.com
> > > > wrote:
> > >
> > > > Hi all!
> > > >
> > > > Right now I am working on reactive streams connector to kafka. I am
> > using
> > > > new client and found strange behavior of commitAsync method which not
> > > > calling callbacks at all at some cases.
> > > >
> > > > I found, that callback calling is a part of handling of incoming
> > > messages.
> > > > These messages are not fetching in background, but fetching during
> > other
> > > > activity (like fetching from topic). In the other hand there is no
> way
> > to
> > > > perform "blank" activity to fetch commit confirmation from Consumer.
> > > >
> > > > Right now if I message processing is depended on commit confirmation
> it
> > > is
> > > > impossible to work in reactive way.
> > > >
> > > > Here it is very small example of problem -
> > > > https://gist.github.com/13h3r/496e802afe65233b184a
> > > >
> > > > My questions are:
> > > > - is it bug or design decision?
> > > > - if it is not bug how I can write reactive consumer?
> > > >
> > > > Thanks!
> > > >
> > >
> >
>

Re: New client commitAsync problem

Posted by Jason Gustafson <ja...@confluent.io>.
>
> - is it cost something? is it any network activity to pause/resume?


That is a great question. The pause() method sets an internal flag which
tells the consumer not to send any more fetches. If a fetch was in progress
and returned while the partition was still paused, then the fetched data
will be discarded. This is significant because the consumer implements a
prefetching optimization in order to pipeline fetching and message
processing. Basically before the consumer returns a set of records, it will
send the next fetch to the broker so that the next batch of messages will
be available when the next poll() is called.

Unfortunately, this prefetching strategy doesn't seem to play nicely with
pause(), at least not as I've proposed to use it above. When poll() returns
with new data for a partition, you would immediately call pause() and then
begin processing the messages. After you finish, you would call
commitAsync() to send the commit. But if the prefetch for the next records
returns before the commit (which seems possible depending on the processing
time and fetch configuration), then we'd have to discard the data. Then
after the commit returns, the partition would be unpaused and we'd resend
the fetch. In the worst case, the consumer would need to send each fetch
twice, which sounds pretty bad.

I'm actually not sure if the situation is quite this bad. It would be
useful to do some testing to see if this double-fetching is a real problem
in practice (or if I've just overthought it). If it is an issue, then there
are some options to deal with it. For example, maybe it would make make
sense to give users a configuration to turn off prefetching. That or we
could hold onto the fetched records indefinitely under the assumption that
the partition would be unpaused eventually.


- there is no API to get information of current status of resume/pause for
> client. Am I wrong? What about having such API?


There was a patch to add a paused() API which would return the set of
partitions currently paused. Is that what you had in mind? Let me see if I
can help get that merged.


Also, you told about "one option". Is it another?


Haha, this is actually the only option that came to mind for what you're
trying to do, but I said "one option" in case someone cleverer than I
thought of another way.

-Jason

On Sat, Feb 6, 2016 at 7:34 AM, Alexey Romanchuk <alexey.romanchuk@gmail.com
> wrote:

> Hi Jason!
>
> Thanks for reply. I try to implement it and looks like it works. Few
> moments about pause/resume:
> - is it cost something? is it any network activity to pause/resume?
> - there is no API to get information of current status of resume/pause for
> client. Am I wrong? What about having such API?
>
> Also, you told about "one option". Is it another?
>
> Thanks!
>
> On Sat, Feb 6, 2016 at 6:13 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Alexey,
> >
> > The API of the new consumer is designed around an event loop in which all
> > IO is driven by the poll() API. To make this work, you need to call
> poll()
> > in a loop (see the javadocs for examples). So in this example, when you
> > call commitAsync(), the request is basically just queued up to be sent.
> It
> > won't actually be transmitted, however, until the next poll() is called.
> > And it may not return until an even later call.
> >
> > If I understand correctly, the basic problem you're trying to figure out
> is
> > how you can do an asynchronous commit without continuing to receive
> records
> > for the associated partitions. One option would be to use the pause() API
> > to suspend fetching from those partitions. So you could call
> commitAsync()
> > with the commits you need to send, then pause the respective partitions.
> > After the commit returns, you could resume() the partitions in the commit
> > callback, which will reenable fetching.
> >
> > Hope that helps!
> >
> > -Jason
> >
> >
> >
> > On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk <
> > alexey.romanchuk@gmail.com
> > > wrote:
> >
> > > Hi all!
> > >
> > > Right now I am working on reactive streams connector to kafka. I am
> using
> > > new client and found strange behavior of commitAsync method which not
> > > calling callbacks at all at some cases.
> > >
> > > I found, that callback calling is a part of handling of incoming
> > messages.
> > > These messages are not fetching in background, but fetching during
> other
> > > activity (like fetching from topic). In the other hand there is no way
> to
> > > perform "blank" activity to fetch commit confirmation from Consumer.
> > >
> > > Right now if I message processing is depended on commit confirmation it
> > is
> > > impossible to work in reactive way.
> > >
> > > Here it is very small example of problem -
> > > https://gist.github.com/13h3r/496e802afe65233b184a
> > >
> > > My questions are:
> > > - is it bug or design decision?
> > > - if it is not bug how I can write reactive consumer?
> > >
> > > Thanks!
> > >
> >
>

Re: New client commitAsync problem

Posted by Alexey Romanchuk <al...@gmail.com>.
Hi Jason!

Thanks for reply. I try to implement it and looks like it works. Few
moments about pause/resume:
- is it cost something? is it any network activity to pause/resume?
- there is no API to get information of current status of resume/pause for
client. Am I wrong? What about having such API?

Also, you told about "one option". Is it another?

Thanks!

On Sat, Feb 6, 2016 at 6:13 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Alexey,
>
> The API of the new consumer is designed around an event loop in which all
> IO is driven by the poll() API. To make this work, you need to call poll()
> in a loop (see the javadocs for examples). So in this example, when you
> call commitAsync(), the request is basically just queued up to be sent. It
> won't actually be transmitted, however, until the next poll() is called.
> And it may not return until an even later call.
>
> If I understand correctly, the basic problem you're trying to figure out is
> how you can do an asynchronous commit without continuing to receive records
> for the associated partitions. One option would be to use the pause() API
> to suspend fetching from those partitions. So you could call commitAsync()
> with the commits you need to send, then pause the respective partitions.
> After the commit returns, you could resume() the partitions in the commit
> callback, which will reenable fetching.
>
> Hope that helps!
>
> -Jason
>
>
>
> On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk <
> alexey.romanchuk@gmail.com
> > wrote:
>
> > Hi all!
> >
> > Right now I am working on reactive streams connector to kafka. I am using
> > new client and found strange behavior of commitAsync method which not
> > calling callbacks at all at some cases.
> >
> > I found, that callback calling is a part of handling of incoming
> messages.
> > These messages are not fetching in background, but fetching during other
> > activity (like fetching from topic). In the other hand there is no way to
> > perform "blank" activity to fetch commit confirmation from Consumer.
> >
> > Right now if I message processing is depended on commit confirmation it
> is
> > impossible to work in reactive way.
> >
> > Here it is very small example of problem -
> > https://gist.github.com/13h3r/496e802afe65233b184a
> >
> > My questions are:
> > - is it bug or design decision?
> > - if it is not bug how I can write reactive consumer?
> >
> > Thanks!
> >
>

Re: New client commitAsync problem

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Alexey,

The API of the new consumer is designed around an event loop in which all
IO is driven by the poll() API. To make this work, you need to call poll()
in a loop (see the javadocs for examples). So in this example, when you
call commitAsync(), the request is basically just queued up to be sent. It
won't actually be transmitted, however, until the next poll() is called.
And it may not return until an even later call.

If I understand correctly, the basic problem you're trying to figure out is
how you can do an asynchronous commit without continuing to receive records
for the associated partitions. One option would be to use the pause() API
to suspend fetching from those partitions. So you could call commitAsync()
with the commits you need to send, then pause the respective partitions.
After the commit returns, you could resume() the partitions in the commit
callback, which will reenable fetching.

Hope that helps!

-Jason



On Fri, Feb 5, 2016 at 6:35 AM, Alexey Romanchuk <alexey.romanchuk@gmail.com
> wrote:

> Hi all!
>
> Right now I am working on reactive streams connector to kafka. I am using
> new client and found strange behavior of commitAsync method which not
> calling callbacks at all at some cases.
>
> I found, that callback calling is a part of handling of incoming messages.
> These messages are not fetching in background, but fetching during other
> activity (like fetching from topic). In the other hand there is no way to
> perform "blank" activity to fetch commit confirmation from Consumer.
>
> Right now if I message processing is depended on commit confirmation it is
> impossible to work in reactive way.
>
> Here it is very small example of problem -
> https://gist.github.com/13h3r/496e802afe65233b184a
>
> My questions are:
> - is it bug or design decision?
> - if it is not bug how I can write reactive consumer?
>
> Thanks!
>