You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2015/06/09 22:12:29 UTC

[Discussion] New Consumer API / Protocol

This email is to kick-off some discussion around the changes we want to
make on the new consumer APIs as well as their semantics. Here are a
not-comprehensive list of items in my mind:

1. Poll(timeout): current definition of timeout states "The time, in
milliseconds, spent waiting in poll if data is not available. If 0, waits
indefinitely." While in the current implementation, we have different
semantics as stated, for example:

a) poll(timeout) can return before "timeout" elapsed with empty consumed
data.
b) poll(timeout) can return after more than "timeout" elapsed due to
blocking event like join-group, coordinator discovery, etc.

We should think a bit more on what semantics we really want to provide and
how to provide it in implementation.

2. Thread safeness: currently we have a coarsen-grained locking mechanism
that provides thread safeness but blocks commit / position / etc calls
while poll() is in process. We are considering to remove the
coarsen-grained locking with an additional Consumer.wakeup() call to break
the polling, and instead suggest users to have one consumer client per
thread, which aligns with the design of a single-threaded consumer
(KAFKA-2123).

3. Commit(): we want to improve the async commit calls to add a callback
handler upon commit completes, and guarantee ordering of commit calls with
retry policies (KAFKA-2168). In addition, we want to extend the API to
expose attaching / fetching offset metadata stored in the Kafka offset
manager.

4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
the generation id and the assigned partitions before accepting the request
if the group is using Kafka for partition management, but for
OffsetFetchRequest we cannot do this checking since it does not include
groupId / consumerId information. Do people think this is OK or we should
add this as we did in OffsetCommitRequest?

5. New APIs: there are some other requests to add:

a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
sufficient?

b) listTopics(): or should we just enforce users to use AdminUtils for such
operations?

There may be other issues that I have missed here, so folks just bring it
up if you thought about anything else.

-- Guozhang

Re: [Discussion] New Consumer API / Protocol

Posted by Guozhang Wang <wa...@gmail.com>.
Yeah I think it is better to discuss these points in the KIP meeting, or it
may become a long thread. Let's do that this Tuesday.

Guozhang

On Thu, Jun 11, 2015 at 5:48 PM, Jun Rao <ju...@confluent.io> wrote:

> Guozhang,
>
> Perhaps we can discuss this in our KIP hangout next week?
>
> Thanks,
>
> Jun
>
> On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > This email is to kick-off some discussion around the changes we want to
> > make on the new consumer APIs as well as their semantics. Here are a
> > not-comprehensive list of items in my mind:
> >
> > 1. Poll(timeout): current definition of timeout states "The time, in
> > milliseconds, spent waiting in poll if data is not available. If 0, waits
> > indefinitely." While in the current implementation, we have different
> > semantics as stated, for example:
> >
> > a) poll(timeout) can return before "timeout" elapsed with empty consumed
> > data.
> > b) poll(timeout) can return after more than "timeout" elapsed due to
> > blocking event like join-group, coordinator discovery, etc.
> >
> > We should think a bit more on what semantics we really want to provide
> and
> > how to provide it in implementation.
> >
> > 2. Thread safeness: currently we have a coarsen-grained locking mechanism
> > that provides thread safeness but blocks commit / position / etc calls
> > while poll() is in process. We are considering to remove the
> > coarsen-grained locking with an additional Consumer.wakeup() call to
> break
> > the polling, and instead suggest users to have one consumer client per
> > thread, which aligns with the design of a single-threaded consumer
> > (KAFKA-2123).
> >
> > 3. Commit(): we want to improve the async commit calls to add a callback
> > handler upon commit completes, and guarantee ordering of commit calls
> with
> > retry policies (KAFKA-2168). In addition, we want to extend the API to
> > expose attaching / fetching offset metadata stored in the Kafka offset
> > manager.
> >
> > 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we
> check
> > the generation id and the assigned partitions before accepting the
> request
> > if the group is using Kafka for partition management, but for
> > OffsetFetchRequest we cannot do this checking since it does not include
> > groupId / consumerId information. Do people think this is OK or we should
> > add this as we did in OffsetCommitRequest?
> >
> > 5. New APIs: there are some other requests to add:
> >
> > a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
> > sufficient?
> >
> > b) listTopics(): or should we just enforce users to use AdminUtils for
> such
> > operations?
> >
> > There may be other issues that I have missed here, so folks just bring it
> > up if you thought about anything else.
> >
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [Discussion] New Consumer API / Protocol

Posted by Jun Rao <ju...@confluent.io>.
Guozhang,

Perhaps we can discuss this in our KIP hangout next week?

Thanks,

Jun

On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang <wa...@gmail.com> wrote:

> This email is to kick-off some discussion around the changes we want to
> make on the new consumer APIs as well as their semantics. Here are a
> not-comprehensive list of items in my mind:
>
> 1. Poll(timeout): current definition of timeout states "The time, in
> milliseconds, spent waiting in poll if data is not available. If 0, waits
> indefinitely." While in the current implementation, we have different
> semantics as stated, for example:
>
> a) poll(timeout) can return before "timeout" elapsed with empty consumed
> data.
> b) poll(timeout) can return after more than "timeout" elapsed due to
> blocking event like join-group, coordinator discovery, etc.
>
> We should think a bit more on what semantics we really want to provide and
> how to provide it in implementation.
>
> 2. Thread safeness: currently we have a coarsen-grained locking mechanism
> that provides thread safeness but blocks commit / position / etc calls
> while poll() is in process. We are considering to remove the
> coarsen-grained locking with an additional Consumer.wakeup() call to break
> the polling, and instead suggest users to have one consumer client per
> thread, which aligns with the design of a single-threaded consumer
> (KAFKA-2123).
>
> 3. Commit(): we want to improve the async commit calls to add a callback
> handler upon commit completes, and guarantee ordering of commit calls with
> retry policies (KAFKA-2168). In addition, we want to extend the API to
> expose attaching / fetching offset metadata stored in the Kafka offset
> manager.
>
> 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
> the generation id and the assigned partitions before accepting the request
> if the group is using Kafka for partition management, but for
> OffsetFetchRequest we cannot do this checking since it does not include
> groupId / consumerId information. Do people think this is OK or we should
> add this as we did in OffsetCommitRequest?
>
> 5. New APIs: there are some other requests to add:
>
> a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
> sufficient?
>
> b) listTopics(): or should we just enforce users to use AdminUtils for such
> operations?
>
> There may be other issues that I have missed here, so folks just bring it
> up if you thought about anything else.
>
> -- Guozhang
>

Re: [Discussion] New Consumer API / Protocol

Posted by Jason Gustafson <ja...@confluent.io>.
Just a minor correction, but #2 is KAFKA-2168 and #3 is KAFKA-2123.

For #1, I think there should be some minimal effort to making the poll
respect the timeout (with the understanding that spurious wakeups can
happen). I think this really just means calling NetworkClient.poll() in a
loop and checking if there are any results to return. An additional
complication is that long poll timeouts can prevent heartbeats or
auto-committed offsets from being sent. This means that the maximum time
that the consumer should actually block in NetworkClient.poll() should be
the minimum of the poll timeout, the heartbeat interval, and the
auto-commit interval (if it's set). I attempted to handle this in my patch
for KAFKA-2168, which is pending review.

I tend to agree with Jay on 1b though. Trying to leave the JoinGroup
pending could get tricky.


On Tue, Jun 9, 2015 at 4:25 PM, Jay Kreps <ja...@gmail.com> wrote:

> My two cents:
>
> Overall I think our focus as far as extensions go should be on (1) things
> which would be incompatible if changed later and (2) operationalization.
> There are lots of new apis that could be nice to have, but I think if
> adding them later will just be an api addition we should hold off and get
> out what we have, if it will be ugly to add later then let's try to get it
> in now.
>
> 1a. I don't think there is too much value in attempting to avoid spurious
> wakeups. Several people have asked about this but I think the only usage
> that makes sense is in an event loop--since you can always get 0 events due
> to a timeout. I think we should just document that the timeout is just
> there for guidance and not strictly enforced. I think this is intuitive
> since no timeout like this is ever really strictly enforced (if there is a
> 30 second gc pause we will be off by 30 seconds regardless of how diligent
> in our own code)
> 1b. Same here. I think we should just document this. Trying to return in
> the middle of a joinGroup will be very complex so I think we should just
> document this.
> Also: The docs currently say that a timeout of 0 blocks forever but I think
> that might be wrong. Regardless I think for consistency a timeout of 0
> should be non-blocking and a timeout of MAX_INT should be used to block
> "forever".
>
> 2. Here is my understanding of this one. I think on this there was
> originally some interest in tightening up the locking to allow more
> parallelism in consumer usage. Personally I think this adds a lot of
> complexity and would prefer to avoid it. My concern is primarily around
> implementation complexity--I think without a very careful, well abstracted
> threading model a big chunk of code with locks of adhoc locking, even if
> perfect when initially written is just very hard to maintain. I also think
> the single threaded usage pattern is also easier for the end-user and
> likely faster, though there are some downsides. After some discussion I
> think there was a second proposal to instead leave the locking as is and
> add a wakeup call. The use case for this would be something like quickly
> interrupting and shutting down a thread that is in it's event loop. I think
> this makes sense though we could probably release a v1 without it if need
> be. I think an open question here is whether interrupt needs to work for
> all calls (e.g. commit) or just poll, with other calls having a defacto
> timeout from the request timeout and retry count. I would vote for the
> later if it helps with implementation simplicity.
>
> 3. I think both these extensions make sense. It would be nice to get them
> into the first release to avoid api churn.
>
> 4. Not sure I fully understand.
>
> 5a. The rationale for doing seekToBeginning and seekToEnd was that we felt
> that we might need to think a little bit about the offset list api a bit
> more before knowing how best to expose it. We clearly need something here
> but the hope was the two seekTo calls were good enough to get started and
> we could wait to think it out properly to add the more general call. I
> think the thinking was basically (a) the time mechanism for the
> intermediate segments is often wrong, (b) the protocol that does the fetch
> is quite ad hoc and perhaps there is more per-segment info that should be
> in that request, and (c) the api in simple consumer is very unintuitive. If
> we are adding an end-state solution
>
> 5b. Not sure but can we add it later? How would mm work?
>
>
> On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > This email is to kick-off some discussion around the changes we want to
> > make on the new consumer APIs as well as their semantics. Here are a
> > not-comprehensive list of items in my mind:
> >
> > 1. Poll(timeout): current definition of timeout states "The time, in
> > milliseconds, spent waiting in poll if data is not available. If 0, waits
> > indefinitely." While in the current implementation, we have different
> > semantics as stated, for example:
> >
> > a) poll(timeout) can return before "timeout" elapsed with empty consumed
> > data.
> > b) poll(timeout) can return after more than "timeout" elapsed due to
> > blocking event like join-group, coordinator discovery, etc.
> >
> > We should think a bit more on what semantics we really want to provide
> and
> > how to provide it in implementation.
> >
> > 2. Thread safeness: currently we have a coarsen-grained locking mechanism
> > that provides thread safeness but blocks commit / position / etc calls
> > while poll() is in process. We are considering to remove the
> > coarsen-grained locking with an additional Consumer.wakeup() call to
> break
> > the polling, and instead suggest users to have one consumer client per
> > thread, which aligns with the design of a single-threaded consumer
> > (KAFKA-2123).
> >
> > 3. Commit(): we want to improve the async commit calls to add a callback
> > handler upon commit completes, and guarantee ordering of commit calls
> with
> > retry policies (KAFKA-2168). In addition, we want to extend the API to
> > expose attaching / fetching offset metadata stored in the Kafka offset
> > manager.
> >
> > 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we
> check
> > the generation id and the assigned partitions before accepting the
> request
> > if the group is using Kafka for partition management, but for
> > OffsetFetchRequest we cannot do this checking since it does not include
> > groupId / consumerId information. Do people think this is OK or we should
> > add this as we did in OffsetCommitRequest?
> >
> > 5. New APIs: there are some other requests to add:
> >
> > a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
> > sufficient?
> >
> > b) listTopics(): or should we just enforce users to use AdminUtils for
> such
> > operations?
> >
> > There may be other issues that I have missed here, so folks just bring it
> > up if you thought about anything else.
> >
> > -- Guozhang
> >
>

Re: [Discussion] New Consumer API / Protocol

Posted by Jay Kreps <ja...@gmail.com>.
My two cents:

Overall I think our focus as far as extensions go should be on (1) things
which would be incompatible if changed later and (2) operationalization.
There are lots of new apis that could be nice to have, but I think if
adding them later will just be an api addition we should hold off and get
out what we have, if it will be ugly to add later then let's try to get it
in now.

1a. I don't think there is too much value in attempting to avoid spurious
wakeups. Several people have asked about this but I think the only usage
that makes sense is in an event loop--since you can always get 0 events due
to a timeout. I think we should just document that the timeout is just
there for guidance and not strictly enforced. I think this is intuitive
since no timeout like this is ever really strictly enforced (if there is a
30 second gc pause we will be off by 30 seconds regardless of how diligent
in our own code)
1b. Same here. I think we should just document this. Trying to return in
the middle of a joinGroup will be very complex so I think we should just
document this.
Also: The docs currently say that a timeout of 0 blocks forever but I think
that might be wrong. Regardless I think for consistency a timeout of 0
should be non-blocking and a timeout of MAX_INT should be used to block
"forever".

2. Here is my understanding of this one. I think on this there was
originally some interest in tightening up the locking to allow more
parallelism in consumer usage. Personally I think this adds a lot of
complexity and would prefer to avoid it. My concern is primarily around
implementation complexity--I think without a very careful, well abstracted
threading model a big chunk of code with locks of adhoc locking, even if
perfect when initially written is just very hard to maintain. I also think
the single threaded usage pattern is also easier for the end-user and
likely faster, though there are some downsides. After some discussion I
think there was a second proposal to instead leave the locking as is and
add a wakeup call. The use case for this would be something like quickly
interrupting and shutting down a thread that is in it's event loop. I think
this makes sense though we could probably release a v1 without it if need
be. I think an open question here is whether interrupt needs to work for
all calls (e.g. commit) or just poll, with other calls having a defacto
timeout from the request timeout and retry count. I would vote for the
later if it helps with implementation simplicity.

3. I think both these extensions make sense. It would be nice to get them
into the first release to avoid api churn.

4. Not sure I fully understand.

5a. The rationale for doing seekToBeginning and seekToEnd was that we felt
that we might need to think a little bit about the offset list api a bit
more before knowing how best to expose it. We clearly need something here
but the hope was the two seekTo calls were good enough to get started and
we could wait to think it out properly to add the more general call. I
think the thinking was basically (a) the time mechanism for the
intermediate segments is often wrong, (b) the protocol that does the fetch
is quite ad hoc and perhaps there is more per-segment info that should be
in that request, and (c) the api in simple consumer is very unintuitive. If
we are adding an end-state solution

5b. Not sure but can we add it later? How would mm work?


On Tue, Jun 9, 2015 at 1:12 PM, Guozhang Wang <wa...@gmail.com> wrote:

> This email is to kick-off some discussion around the changes we want to
> make on the new consumer APIs as well as their semantics. Here are a
> not-comprehensive list of items in my mind:
>
> 1. Poll(timeout): current definition of timeout states "The time, in
> milliseconds, spent waiting in poll if data is not available. If 0, waits
> indefinitely." While in the current implementation, we have different
> semantics as stated, for example:
>
> a) poll(timeout) can return before "timeout" elapsed with empty consumed
> data.
> b) poll(timeout) can return after more than "timeout" elapsed due to
> blocking event like join-group, coordinator discovery, etc.
>
> We should think a bit more on what semantics we really want to provide and
> how to provide it in implementation.
>
> 2. Thread safeness: currently we have a coarsen-grained locking mechanism
> that provides thread safeness but blocks commit / position / etc calls
> while poll() is in process. We are considering to remove the
> coarsen-grained locking with an additional Consumer.wakeup() call to break
> the polling, and instead suggest users to have one consumer client per
> thread, which aligns with the design of a single-threaded consumer
> (KAFKA-2123).
>
> 3. Commit(): we want to improve the async commit calls to add a callback
> handler upon commit completes, and guarantee ordering of commit calls with
> retry policies (KAFKA-2168). In addition, we want to extend the API to
> expose attaching / fetching offset metadata stored in the Kafka offset
> manager.
>
> 4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
> the generation id and the assigned partitions before accepting the request
> if the group is using Kafka for partition management, but for
> OffsetFetchRequest we cannot do this checking since it does not include
> groupId / consumerId information. Do people think this is OK or we should
> add this as we did in OffsetCommitRequest?
>
> 5. New APIs: there are some other requests to add:
>
> a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
> sufficient?
>
> b) listTopics(): or should we just enforce users to use AdminUtils for such
> operations?
>
> There may be other issues that I have missed here, so folks just bring it
> up if you thought about anything else.
>
> -- Guozhang
>

Re: [Discussion] New Consumer API / Protocol

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Thanks for kicking off this discussion, Guozhang.
We might also want to discuss the API to expose the high watermark. Some
discussion has been there in KAFKA-2076.

Thanks,

Jiangjie (Becket) Qin

On 6/9/15, 1:12 PM, "Guozhang Wang" <wa...@gmail.com> wrote:

>This email is to kick-off some discussion around the changes we want to
>make on the new consumer APIs as well as their semantics. Here are a
>not-comprehensive list of items in my mind:
>
>1. Poll(timeout): current definition of timeout states "The time, in
>milliseconds, spent waiting in poll if data is not available. If 0, waits
>indefinitely." While in the current implementation, we have different
>semantics as stated, for example:
>
>a) poll(timeout) can return before "timeout" elapsed with empty consumed
>data.
>b) poll(timeout) can return after more than "timeout" elapsed due to
>blocking event like join-group, coordinator discovery, etc.
>
>We should think a bit more on what semantics we really want to provide and
>how to provide it in implementation.
>
>2. Thread safeness: currently we have a coarsen-grained locking mechanism
>that provides thread safeness but blocks commit / position / etc calls
>while poll() is in process. We are considering to remove the
>coarsen-grained locking with an additional Consumer.wakeup() call to break
>the polling, and instead suggest users to have one consumer client per
>thread, which aligns with the design of a single-threaded consumer
>(KAFKA-2123).
>
>3. Commit(): we want to improve the async commit calls to add a callback
>handler upon commit completes, and guarantee ordering of commit calls with
>retry policies (KAFKA-2168). In addition, we want to extend the API to
>expose attaching / fetching offset metadata stored in the Kafka offset
>manager.
>
>4. OffsetFetchRequest: currently for handling OffsetCommitRequest we check
>the generation id and the assigned partitions before accepting the request
>if the group is using Kafka for partition management, but for
>OffsetFetchRequest we cannot do this checking since it does not include
>groupId / consumerId information. Do people think this is OK or we should
>add this as we did in OffsetCommitRequest?
>
>5. New APIs: there are some other requests to add:
>
>a) offsetsBeforeTime(timestamp): or would seekToEnd and seekToBeginning
>sufficient?
>
>b) listTopics(): or should we just enforce users to use AdminUtils for
>such
>operations?
>
>There may be other issues that I have missed here, so folks just bring it
>up if you thought about anything else.
>
>-- Guozhang