You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Rajini Sivaram <rs...@pivotal.io> on 2016/12/15 13:32:39 UTC

[DISCUSS] KIP-102 - Add close with timeout for consumers

Hi all,

I have just created KIP-102 to add a new close method for consumers with a
timeout parameter, making Consumer consistent with Producer:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers

Comments and suggestions are welcome.

Thank you...

Regards,

Rajini

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Becket Qin <be...@gmail.com>.
+1 on the idea. We have a ticket about making all the blocking call have a
timeout in KafkaConsumer. The implementation could be a little tricky as
Ewen mentioned. But for close it is probably a simpler case because in the
worst case the consumer will just stop polling and heartbeating and
eventually got kicked out of the group. It is not ideal but maybe less
worrisome.

On Thu, Dec 15, 2016 at 3:08 PM, Rajini Sivaram <rs...@pivotal.io> wrote:

> Ewen,
>
> Thank you, I will try to prototype a solution early next week to get a
> better understanding of how invasive the changes are.
>
>
>
> On Thu, Dec 15, 2016 at 9:35 PM, Ewen Cheslack-Postava <ew...@confluent.io>
> wrote:
>
> > Rajini,
> >
> > Thanks for this KIP, I'd definitely like to see this. Connect has had a
> > long-standing TODO around stopping sink tasks where we can't properly
> > manage the rebalance process (which involves stopping consumers) because
> we
> > lack a timeout here. Not a huge problem in practice, but would be nice to
> > fix. And I know I've seen at least a half dozen requests for this (and
> > other timeouts to be respected in the consumer) on the mailing list.
> >
> > My only concern is that this could be a pretty substantial change to the
> > consumer code. We've had TODO items in the code since Jay wrote the first
> > version that say something about avoiding infinite looping and waiting
> > indefinitely on group membership. If we implement this, I'd hope to get
> it
> > committed in the early part of the release cycle so we have plenty of
> time
> > to stabilize + bug fix.
> >
> > -Ewen
> >
> > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io>
> > wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-102 to add a new close method for consumers
> with
> > a
> > > timeout parameter, making Consumer consistent with Producer:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 102+-+Add+close+with+timeout+for+consumers
> > >
> > > Comments and suggestions are welcome.
> > >
> > > Thank you...
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <rs...@pivotal.io>.
Ewen,

Thank you, I will try to prototype a solution early next week to get a
better understanding of how invasive the changes are.



On Thu, Dec 15, 2016 at 9:35 PM, Ewen Cheslack-Postava <ew...@confluent.io>
wrote:

> Rajini,
>
> Thanks for this KIP, I'd definitely like to see this. Connect has had a
> long-standing TODO around stopping sink tasks where we can't properly
> manage the rebalance process (which involves stopping consumers) because we
> lack a timeout here. Not a huge problem in practice, but would be nice to
> fix. And I know I've seen at least a half dozen requests for this (and
> other timeouts to be respected in the consumer) on the mailing list.
>
> My only concern is that this could be a pretty substantial change to the
> consumer code. We've had TODO items in the code since Jay wrote the first
> version that say something about avoiding infinite looping and waiting
> indefinitely on group membership. If we implement this, I'd hope to get it
> committed in the early part of the release cycle so we have plenty of time
> to stabilize + bug fix.
>
> -Ewen
>
> On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io>
> wrote:
>
> > Hi all,
> >
> > I have just created KIP-102 to add a new close method for consumers with
> a
> > timeout parameter, making Consumer consistent with Producer:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 102+-+Add+close+with+timeout+for+consumers
> >
> > Comments and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
Rajini,

Thanks for this KIP, I'd definitely like to see this. Connect has had a
long-standing TODO around stopping sink tasks where we can't properly
manage the rebalance process (which involves stopping consumers) because we
lack a timeout here. Not a huge problem in practice, but would be nice to
fix. And I know I've seen at least a half dozen requests for this (and
other timeouts to be respected in the consumer) on the mailing list.

My only concern is that this could be a pretty substantial change to the
consumer code. We've had TODO items in the code since Jay wrote the first
version that say something about avoiding infinite looping and waiting
indefinitely on group membership. If we implement this, I'd hope to get it
committed in the early part of the release cycle so we have plenty of time
to stabilize + bug fix.

-Ewen

On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io> wrote:

> Hi all,
>
> I have just created KIP-102 to add a new close method for consumers with a
> timeout parameter, making Consumer consistent with Producer:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 102+-+Add+close+with+timeout+for+consumers
>
> Comments and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <ra...@gmail.com>.
Thank you, Jason. I will start the vote.

On Thu, Jan 5, 2017 at 5:52 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Yeah, if you start a vote soon, I think it has a chance to get into 0.10.2.
> I guess it's up to Ewen, but I'm happy to help review.
>
> -Jason
>
> On Wed, Jan 4, 2017 at 11:42 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > Yes, we do potentially timeout even before sending pending commits after
> > the request timeout (default is > 5 minutes, so this should only happen
> > when there are real issues or when brokers are shutdown). I have updated
> > the KIP to use a default timeout of 30 seconds for the existing close()
> > method.
> >
> > Since the code changes are limited to the close() code path, can we
> include
> > this in 0.10.2.0? If so, I can initiate the vote tomorrow.
> >
> > Thank you...
> >
> >
> > On Wed, Jan 4, 2017 at 5:35 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Rajini,
> > >
> > > Thanks for the clarification. I looked again at the patch and I see
> what
> > > you're saying now. I was confused because I assumed the request timeout
> > was
> > > being enforced on the requests themselves, but it is more that the
> > request
> > > timeout bounds the attempt to send them in addition to the time to
> > receive
> > > a response, right? So it is possible that we timeout before even
> getting
> > a
> > > chance to send the OffsetCommit (for example).
> > >
> > > I think I'd still prefer timing out quicker by default if possible. The
> > one
> > > case where it might be worthwhile waiting longer is when there are
> > pending
> > > offset commits sent through commitSync() or commitAsync(). But if we're
> > not
> > > actually doing retries or coordinator rediscovery, I'm not sure the
> > > additional time helps that much.
> > >
> > > -Jason
> > >
> > > On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <
> rajinisivaram@gmail.com>
> > > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > Thank you for the review.
> > > >
> > > > During close(), if there is a rebalance and the coordinator has to be
> > > > rediscovered, close terminates without trying to find the
> coordinator.
> > > The
> > > > poll() loop within close terminates if the coordinator is not known
> (as
> > > it
> > > > does now) or if the timeout expires. At the moment, that timeout is a
> > > > hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> > > > requestTimeout). So even if there are pending commits, the maximum
> wait
> > > > will be requestTimeout in the final poll() loop of close().
> > > >
> > > > In addition to this, before the poll loop, there is a
> > > > maybeAutoCommitOffsetsSync(). At the moment, this does not have a
> > timeout
> > > > and can wait indefinitely. The PR introduces a timeout for this
> commit
> > > > invoked from close(). The timeout is min(closeTimeout,
> requestTimeout).
> > > > Hence the maximum timeout of (2 * requestTimeout) for any close.
> Have I
> > > > missed something?
> > > >
> > > > I had chosen Long.MAX_VALUE as default close timeout to be consistent
> > > with
> > > > Producer. But perhaps a lower timeout of 30 seconds is more
> meaningful
> > > for
> > > > Consumer since consumer typically has less to do. Even with (2 *
> > > > requestTimeout), the default would be 20 minutes, which is perhaps
> too
> > > high
> > > > anyway. I will update the KIP.
> > > >
> > > >
> > > > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey Rajini,
> > > > >
> > > > > Thanks for the KIP. I had a quick look at the patch and the impact
> > > > doesn't
> > > > > seem too bad. Just wanted to clarify one point. This is from the
> KIP:
> > > > >
> > > > > The existing close() method without a timeout will attempt to close
> > the
> > > > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since
> commit
> > > and
> > > > > > leave group requests are timed out after the request timeout, the
> > > upper
> > > > > > bound will be approximately 2*request.timeout.ms (around 10
> > minutes
> > > by
> > > > > > default).
> > > > >
> > > > >
> > > > > I don't think this is quite right. There could be one or more
> pending
> > > > > OffsetCommit requests (sent using commitAsync) that we may have to
> > > await.
> > > > > We could also be in the middle of a group rebalance. The other
> > > > complication
> > > > > is what happens in the event of a request timeout. Usually the
> > consumer
> > > > > will rediscover the coordinator. Would we do that as well in
> close()
> > > and
> > > > > retry any failed requests if there is time remaining, or would we
> > just
> > > > fail
> > > > > the remaining requests and return? In any case, it may not be so
> easy
> > > to
> > > > > set an upper bound on the default timeout.
> > > > >
> > > > > With that in mind, I'm wondering whether waiting indefinitely
> should
> > be
> > > > the
> > > > > default. In the case of the OffsetCommit before closing (when
> > > autocommit
> > > > is
> > > > > enabled) or the LeaveGroup, it's more or less OK if these requests
> > > fail.
> > > > > Maybe we should consider them best effort (as is currently done)
> and
> > > > wait a
> > > > > reasonable amount of time (say 30 seconds) for their completion.
> I'd
> > > > rather
> > > > > have "nice" behavior out of the box and let users who want
> indefinite
> > > > > blocking use Long.MAX_VALUE themselves. What do you think?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <
> > > rajinisivaram@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > I have added some more detail to the "Proposed Changes" section.
> > Also
> > > > > > created a preliminary PR for the JIRA (
> > > > > > https://github.com/apache/kafka/pull/2285).
> > > > > >
> > > > > > I am using *request.timeout.ms <http://request.timeout.ms>* to
> > bound
> > > > > > individual requests during close (the KIP does not address
> timeouts
> > > in
> > > > > any
> > > > > > other code path) to ensure that *close()* always completes
> within a
> > > > > bounded
> > > > > > time even when timeout is not specified. This is similar to the
> > > > producer
> > > > > > where requests are aborted after *request.timeout.ms
> > > > > > <http://request.timeout.ms>. *The PR contains unit and
> integration
> > > > tests
> > > > > > for all the close scenarios I could think of (but there could be
> > > more).
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <
> > wangguoz@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 on this idea as well.
> > > > > > >
> > > > > > > Streams has also added a similar feature itself partly because
> > > > consumer
> > > > > > > does not support it directly (other part of the reason is that
> > like
> > > > > > > brokers, streams also have some exception handling logic which
> > > could
> > > > > lead
> > > > > > > to deadlock with careless System.exit). For consumer itself I
> > think
> > > > the
> > > > > > > trickiness lies in the prefetching calls as well as commit / HB
> > > > > requests
> > > > > > > cleanup with the timeout, and I agree with Ewen that it's
> better
> > to
> > > > be
> > > > > > > merged in the early release cycle than a last minute merge.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > > > > rajinisivaram@gmail.com
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thank you for the reviews.
> > > > > > > >
> > > > > > > > @Becket @Ewen, Agree that making all blocking calls have a
> > > timeout
> > > > > will
> > > > > > > be
> > > > > > > > trickier and hence the scope of this KIP is limited to
> close().
> > > > > > > >
> > > > > > > > @Jay Yes, this should definitely go into release notes, will
> > make
> > > > > sure
> > > > > > it
> > > > > > > > is added. I will add some integration tests with broker
> > failures
> > > > for
> > > > > > > > testing the timeout, but they cannot completely eliminate the
> > > risk
> > > > > of a
> > > > > > > > hang. Over time, hopefully system tests will help catch most
> > > > issues.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <jay@confluent.io
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > I think this is great. Sounds like one implication is that
> > > > existing
> > > > > > > code
> > > > > > > > > that called close() and hit the timeout would now hang
> > > > > indefinitely.
> > > > > > We
> > > > > > > > saw
> > > > > > > > > this kind of thing a lot in automated testing scenarios
> where
> > > > > people
> > > > > > > > don't
> > > > > > > > > correctly sequence their shutdown of client and server. I
> > think
> > > > > this
> > > > > > is
> > > > > > > > > okay, but might be good to include in the release notes.
> > > > > > > > >
> > > > > > > > > -jay
> > > > > > > > >
> > > > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > > > > rsivaram@pivotal.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I have just created KIP-102 to add a new close method for
> > > > consumers
> > > > > > > with
> > > > > > > > a
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > timeout parameter, making Consumer consistent with
> Producer:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Comments and suggestions are welcome.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thank you...
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Rajini
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Regards,
> > > > > > > >
> > > > > > > > Rajini
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Jason Gustafson <ja...@confluent.io>.
Yeah, if you start a vote soon, I think it has a chance to get into 0.10.2.
I guess it's up to Ewen, but I'm happy to help review.

-Jason

On Wed, Jan 4, 2017 at 11:42 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Jason,
>
> Yes, we do potentially timeout even before sending pending commits after
> the request timeout (default is > 5 minutes, so this should only happen
> when there are real issues or when brokers are shutdown). I have updated
> the KIP to use a default timeout of 30 seconds for the existing close()
> method.
>
> Since the code changes are limited to the close() code path, can we include
> this in 0.10.2.0? If so, I can initiate the vote tomorrow.
>
> Thank you...
>
>
> On Wed, Jan 4, 2017 at 5:35 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Rajini,
> >
> > Thanks for the clarification. I looked again at the patch and I see what
> > you're saying now. I was confused because I assumed the request timeout
> was
> > being enforced on the requests themselves, but it is more that the
> request
> > timeout bounds the attempt to send them in addition to the time to
> receive
> > a response, right? So it is possible that we timeout before even getting
> a
> > chance to send the OffsetCommit (for example).
> >
> > I think I'd still prefer timing out quicker by default if possible. The
> one
> > case where it might be worthwhile waiting longer is when there are
> pending
> > offset commits sent through commitSync() or commitAsync(). But if we're
> not
> > actually doing retries or coordinator rediscovery, I'm not sure the
> > additional time helps that much.
> >
> > -Jason
> >
> > On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <ra...@gmail.com>
> > wrote:
> >
> > > Hi Jason,
> > >
> > > Thank you for the review.
> > >
> > > During close(), if there is a rebalance and the coordinator has to be
> > > rediscovered, close terminates without trying to find the coordinator.
> > The
> > > poll() loop within close terminates if the coordinator is not known (as
> > it
> > > does now) or if the timeout expires. At the moment, that timeout is a
> > > hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> > > requestTimeout). So even if there are pending commits, the maximum wait
> > > will be requestTimeout in the final poll() loop of close().
> > >
> > > In addition to this, before the poll loop, there is a
> > > maybeAutoCommitOffsetsSync(). At the moment, this does not have a
> timeout
> > > and can wait indefinitely. The PR introduces a timeout for this commit
> > > invoked from close(). The timeout is min(closeTimeout, requestTimeout).
> > > Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
> > > missed something?
> > >
> > > I had chosen Long.MAX_VALUE as default close timeout to be consistent
> > with
> > > Producer. But perhaps a lower timeout of 30 seconds is more meaningful
> > for
> > > Consumer since consumer typically has less to do. Even with (2 *
> > > requestTimeout), the default would be 20 minutes, which is perhaps too
> > high
> > > anyway. I will update the KIP.
> > >
> > >
> > > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Rajini,
> > > >
> > > > Thanks for the KIP. I had a quick look at the patch and the impact
> > > doesn't
> > > > seem too bad. Just wanted to clarify one point. This is from the KIP:
> > > >
> > > > The existing close() method without a timeout will attempt to close
> the
> > > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit
> > and
> > > > > leave group requests are timed out after the request timeout, the
> > upper
> > > > > bound will be approximately 2*request.timeout.ms (around 10
> minutes
> > by
> > > > > default).
> > > >
> > > >
> > > > I don't think this is quite right. There could be one or more pending
> > > > OffsetCommit requests (sent using commitAsync) that we may have to
> > await.
> > > > We could also be in the middle of a group rebalance. The other
> > > complication
> > > > is what happens in the event of a request timeout. Usually the
> consumer
> > > > will rediscover the coordinator. Would we do that as well in close()
> > and
> > > > retry any failed requests if there is time remaining, or would we
> just
> > > fail
> > > > the remaining requests and return? In any case, it may not be so easy
> > to
> > > > set an upper bound on the default timeout.
> > > >
> > > > With that in mind, I'm wondering whether waiting indefinitely should
> be
> > > the
> > > > default. In the case of the OffsetCommit before closing (when
> > autocommit
> > > is
> > > > enabled) or the LeaveGroup, it's more or less OK if these requests
> > fail.
> > > > Maybe we should consider them best effort (as is currently done) and
> > > wait a
> > > > reasonable amount of time (say 30 seconds) for their completion. I'd
> > > rather
> > > > have "nice" behavior out of the box and let users who want indefinite
> > > > blocking use Long.MAX_VALUE themselves. What do you think?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <
> > rajinisivaram@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > I have added some more detail to the "Proposed Changes" section.
> Also
> > > > > created a preliminary PR for the JIRA (
> > > > > https://github.com/apache/kafka/pull/2285).
> > > > >
> > > > > I am using *request.timeout.ms <http://request.timeout.ms>* to
> bound
> > > > > individual requests during close (the KIP does not address timeouts
> > in
> > > > any
> > > > > other code path) to ensure that *close()* always completes within a
> > > > bounded
> > > > > time even when timeout is not specified. This is similar to the
> > > producer
> > > > > where requests are aborted after *request.timeout.ms
> > > > > <http://request.timeout.ms>. *The PR contains unit and integration
> > > tests
> > > > > for all the close scenarios I could think of (but there could be
> > more).
> > > > >
> > > > >
> > > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1 on this idea as well.
> > > > > >
> > > > > > Streams has also added a similar feature itself partly because
> > > consumer
> > > > > > does not support it directly (other part of the reason is that
> like
> > > > > > brokers, streams also have some exception handling logic which
> > could
> > > > lead
> > > > > > to deadlock with careless System.exit). For consumer itself I
> think
> > > the
> > > > > > trickiness lies in the prefetching calls as well as commit / HB
> > > > requests
> > > > > > cleanup with the timeout, and I agree with Ewen that it's better
> to
> > > be
> > > > > > merged in the early release cycle than a last minute merge.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > > > rajinisivaram@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you for the reviews.
> > > > > > >
> > > > > > > @Becket @Ewen, Agree that making all blocking calls have a
> > timeout
> > > > will
> > > > > > be
> > > > > > > trickier and hence the scope of this KIP is limited to close().
> > > > > > >
> > > > > > > @Jay Yes, this should definitely go into release notes, will
> make
> > > > sure
> > > > > it
> > > > > > > is added. I will add some integration tests with broker
> failures
> > > for
> > > > > > > testing the timeout, but they cannot completely eliminate the
> > risk
> > > > of a
> > > > > > > hang. Over time, hopefully system tests will help catch most
> > > issues.
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io>
> > > wrote:
> > > > > > >
> > > > > > > > I think this is great. Sounds like one implication is that
> > > existing
> > > > > > code
> > > > > > > > that called close() and hit the timeout would now hang
> > > > indefinitely.
> > > > > We
> > > > > > > saw
> > > > > > > > this kind of thing a lot in automated testing scenarios where
> > > > people
> > > > > > > don't
> > > > > > > > correctly sequence their shutdown of client and server. I
> think
> > > > this
> > > > > is
> > > > > > > > okay, but might be good to include in the release notes.
> > > > > > > >
> > > > > > > > -jay
> > > > > > > >
> > > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > > > rsivaram@pivotal.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > I have just created KIP-102 to add a new close method for
> > > consumers
> > > > > > with
> > > > > > > a
> > > > > > > >
> > > > > > > >
> > > > > > > > timeout parameter, making Consumer consistent with Producer:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Comments and suggestions are welcome.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Thank you...
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Rajini
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Regards,
> > > > > > >
> > > > > > > Rajini
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Jason,

Yes, we do potentially timeout even before sending pending commits after
the request timeout (default is > 5 minutes, so this should only happen
when there are real issues or when brokers are shutdown). I have updated
the KIP to use a default timeout of 30 seconds for the existing close()
method.

Since the code changes are limited to the close() code path, can we include
this in 0.10.2.0? If so, I can initiate the vote tomorrow.

Thank you...


On Wed, Jan 4, 2017 at 5:35 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hi Rajini,
>
> Thanks for the clarification. I looked again at the patch and I see what
> you're saying now. I was confused because I assumed the request timeout was
> being enforced on the requests themselves, but it is more that the request
> timeout bounds the attempt to send them in addition to the time to receive
> a response, right? So it is possible that we timeout before even getting a
> chance to send the OffsetCommit (for example).
>
> I think I'd still prefer timing out quicker by default if possible. The one
> case where it might be worthwhile waiting longer is when there are pending
> offset commits sent through commitSync() or commitAsync(). But if we're not
> actually doing retries or coordinator rediscovery, I'm not sure the
> additional time helps that much.
>
> -Jason
>
> On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Hi Jason,
> >
> > Thank you for the review.
> >
> > During close(), if there is a rebalance and the coordinator has to be
> > rediscovered, close terminates without trying to find the coordinator.
> The
> > poll() loop within close terminates if the coordinator is not known (as
> it
> > does now) or if the timeout expires. At the moment, that timeout is a
> > hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> > requestTimeout). So even if there are pending commits, the maximum wait
> > will be requestTimeout in the final poll() loop of close().
> >
> > In addition to this, before the poll loop, there is a
> > maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
> > and can wait indefinitely. The PR introduces a timeout for this commit
> > invoked from close(). The timeout is min(closeTimeout, requestTimeout).
> > Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
> > missed something?
> >
> > I had chosen Long.MAX_VALUE as default close timeout to be consistent
> with
> > Producer. But perhaps a lower timeout of 30 seconds is more meaningful
> for
> > Consumer since consumer typically has less to do. Even with (2 *
> > requestTimeout), the default would be 20 minutes, which is perhaps too
> high
> > anyway. I will update the KIP.
> >
> >
> > On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hey Rajini,
> > >
> > > Thanks for the KIP. I had a quick look at the patch and the impact
> > doesn't
> > > seem too bad. Just wanted to clarify one point. This is from the KIP:
> > >
> > > The existing close() method without a timeout will attempt to close the
> > > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit
> and
> > > > leave group requests are timed out after the request timeout, the
> upper
> > > > bound will be approximately 2*request.timeout.ms (around 10 minutes
> by
> > > > default).
> > >
> > >
> > > I don't think this is quite right. There could be one or more pending
> > > OffsetCommit requests (sent using commitAsync) that we may have to
> await.
> > > We could also be in the middle of a group rebalance. The other
> > complication
> > > is what happens in the event of a request timeout. Usually the consumer
> > > will rediscover the coordinator. Would we do that as well in close()
> and
> > > retry any failed requests if there is time remaining, or would we just
> > fail
> > > the remaining requests and return? In any case, it may not be so easy
> to
> > > set an upper bound on the default timeout.
> > >
> > > With that in mind, I'm wondering whether waiting indefinitely should be
> > the
> > > default. In the case of the OffsetCommit before closing (when
> autocommit
> > is
> > > enabled) or the LeaveGroup, it's more or less OK if these requests
> fail.
> > > Maybe we should consider them best effort (as is currently done) and
> > wait a
> > > reasonable amount of time (say 30 seconds) for their completion. I'd
> > rather
> > > have "nice" behavior out of the box and let users who want indefinite
> > > blocking use Long.MAX_VALUE themselves. What do you think?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <
> rajinisivaram@gmail.com
> > >
> > > wrote:
> > >
> > > > I have added some more detail to the "Proposed Changes" section. Also
> > > > created a preliminary PR for the JIRA (
> > > > https://github.com/apache/kafka/pull/2285).
> > > >
> > > > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > > > individual requests during close (the KIP does not address timeouts
> in
> > > any
> > > > other code path) to ensure that *close()* always completes within a
> > > bounded
> > > > time even when timeout is not specified. This is similar to the
> > producer
> > > > where requests are aborted after *request.timeout.ms
> > > > <http://request.timeout.ms>. *The PR contains unit and integration
> > tests
> > > > for all the close scenarios I could think of (but there could be
> more).
> > > >
> > > >
> > > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wa...@gmail.com>
> > > > wrote:
> > > >
> > > > > +1 on this idea as well.
> > > > >
> > > > > Streams has also added a similar feature itself partly because
> > consumer
> > > > > does not support it directly (other part of the reason is that like
> > > > > brokers, streams also have some exception handling logic which
> could
> > > lead
> > > > > to deadlock with careless System.exit). For consumer itself I think
> > the
> > > > > trickiness lies in the prefetching calls as well as commit / HB
> > > requests
> > > > > cleanup with the timeout, and I agree with Ewen that it's better to
> > be
> > > > > merged in the early release cycle than a last minute merge.
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > > rajinisivaram@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Thank you for the reviews.
> > > > > >
> > > > > > @Becket @Ewen, Agree that making all blocking calls have a
> timeout
> > > will
> > > > > be
> > > > > > trickier and hence the scope of this KIP is limited to close().
> > > > > >
> > > > > > @Jay Yes, this should definitely go into release notes, will make
> > > sure
> > > > it
> > > > > > is added. I will add some integration tests with broker failures
> > for
> > > > > > testing the timeout, but they cannot completely eliminate the
> risk
> > > of a
> > > > > > hang. Over time, hopefully system tests will help catch most
> > issues.
> > > > > >
> > > > > >
> > > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > I think this is great. Sounds like one implication is that
> > existing
> > > > > code
> > > > > > > that called close() and hit the timeout would now hang
> > > indefinitely.
> > > > We
> > > > > > saw
> > > > > > > this kind of thing a lot in automated testing scenarios where
> > > people
> > > > > > don't
> > > > > > > correctly sequence their shutdown of client and server. I think
> > > this
> > > > is
> > > > > > > okay, but might be good to include in the release notes.
> > > > > > >
> > > > > > > -jay
> > > > > > >
> > > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > > rsivaram@pivotal.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I have just created KIP-102 to add a new close method for
> > consumers
> > > > > with
> > > > > > a
> > > > > > >
> > > > > > >
> > > > > > > timeout parameter, making Consumer consistent with Producer:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Comments and suggestions are welcome.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Thank you...
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Rajini
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Regards,
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Rajini,

Thanks for the clarification. I looked again at the patch and I see what
you're saying now. I was confused because I assumed the request timeout was
being enforced on the requests themselves, but it is more that the request
timeout bounds the attempt to send them in addition to the time to receive
a response, right? So it is possible that we timeout before even getting a
chance to send the OffsetCommit (for example).

I think I'd still prefer timing out quicker by default if possible. The one
case where it might be worthwhile waiting longer is when there are pending
offset commits sent through commitSync() or commitAsync(). But if we're not
actually doing retries or coordinator rediscovery, I'm not sure the
additional time helps that much.

-Jason

On Wed, Jan 4, 2017 at 8:27 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Hi Jason,
>
> Thank you for the review.
>
> During close(), if there is a rebalance and the coordinator has to be
> rediscovered, close terminates without trying to find the coordinator. The
> poll() loop within close terminates if the coordinator is not known (as it
> does now) or if the timeout expires. At the moment, that timeout is a
> hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
> requestTimeout). So even if there are pending commits, the maximum wait
> will be requestTimeout in the final poll() loop of close().
>
> In addition to this, before the poll loop, there is a
> maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
> and can wait indefinitely. The PR introduces a timeout for this commit
> invoked from close(). The timeout is min(closeTimeout, requestTimeout).
> Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
> missed something?
>
> I had chosen Long.MAX_VALUE as default close timeout to be consistent with
> Producer. But perhaps a lower timeout of 30 seconds is more meaningful for
> Consumer since consumer typically has less to do. Even with (2 *
> requestTimeout), the default would be 20 minutes, which is perhaps too high
> anyway. I will update the KIP.
>
>
> On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Rajini,
> >
> > Thanks for the KIP. I had a quick look at the patch and the impact
> doesn't
> > seem too bad. Just wanted to clarify one point. This is from the KIP:
> >
> > The existing close() method without a timeout will attempt to close the
> > > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and
> > > leave group requests are timed out after the request timeout, the upper
> > > bound will be approximately 2*request.timeout.ms (around 10 minutes by
> > > default).
> >
> >
> > I don't think this is quite right. There could be one or more pending
> > OffsetCommit requests (sent using commitAsync) that we may have to await.
> > We could also be in the middle of a group rebalance. The other
> complication
> > is what happens in the event of a request timeout. Usually the consumer
> > will rediscover the coordinator. Would we do that as well in close() and
> > retry any failed requests if there is time remaining, or would we just
> fail
> > the remaining requests and return? In any case, it may not be so easy to
> > set an upper bound on the default timeout.
> >
> > With that in mind, I'm wondering whether waiting indefinitely should be
> the
> > default. In the case of the OffsetCommit before closing (when autocommit
> is
> > enabled) or the LeaveGroup, it's more or less OK if these requests fail.
> > Maybe we should consider them best effort (as is currently done) and
> wait a
> > reasonable amount of time (say 30 seconds) for their completion. I'd
> rather
> > have "nice" behavior out of the box and let users who want indefinite
> > blocking use Long.MAX_VALUE themselves. What do you think?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <rajinisivaram@gmail.com
> >
> > wrote:
> >
> > > I have added some more detail to the "Proposed Changes" section. Also
> > > created a preliminary PR for the JIRA (
> > > https://github.com/apache/kafka/pull/2285).
> > >
> > > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > > individual requests during close (the KIP does not address timeouts in
> > any
> > > other code path) to ensure that *close()* always completes within a
> > bounded
> > > time even when timeout is not specified. This is similar to the
> producer
> > > where requests are aborted after *request.timeout.ms
> > > <http://request.timeout.ms>. *The PR contains unit and integration
> tests
> > > for all the close scenarios I could think of (but there could be more).
> > >
> > >
> > > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > +1 on this idea as well.
> > > >
> > > > Streams has also added a similar feature itself partly because
> consumer
> > > > does not support it directly (other part of the reason is that like
> > > > brokers, streams also have some exception handling logic which could
> > lead
> > > > to deadlock with careless System.exit). For consumer itself I think
> the
> > > > trickiness lies in the prefetching calls as well as commit / HB
> > requests
> > > > cleanup with the timeout, and I agree with Ewen that it's better to
> be
> > > > merged in the early release cycle than a last minute merge.
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> > rajinisivaram@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Thank you for the reviews.
> > > > >
> > > > > @Becket @Ewen, Agree that making all blocking calls have a timeout
> > will
> > > > be
> > > > > trickier and hence the scope of this KIP is limited to close().
> > > > >
> > > > > @Jay Yes, this should definitely go into release notes, will make
> > sure
> > > it
> > > > > is added. I will add some integration tests with broker failures
> for
> > > > > testing the timeout, but they cannot completely eliminate the risk
> > of a
> > > > > hang. Over time, hopefully system tests will help catch most
> issues.
> > > > >
> > > > >
> > > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io>
> wrote:
> > > > >
> > > > > > I think this is great. Sounds like one implication is that
> existing
> > > > code
> > > > > > that called close() and hit the timeout would now hang
> > indefinitely.
> > > We
> > > > > saw
> > > > > > this kind of thing a lot in automated testing scenarios where
> > people
> > > > > don't
> > > > > > correctly sequence their shutdown of client and server. I think
> > this
> > > is
> > > > > > okay, but might be good to include in the release notes.
> > > > > >
> > > > > > -jay
> > > > > >
> > > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> > rsivaram@pivotal.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have just created KIP-102 to add a new close method for
> consumers
> > > > with
> > > > > a
> > > > > >
> > > > > >
> > > > > > timeout parameter, making Consumer consistent with Producer:
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Comments and suggestions are welcome.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thank you...
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <ra...@gmail.com>.
Hi Jason,

Thank you for the review.

During close(), if there is a rebalance and the coordinator has to be
rediscovered, close terminates without trying to find the coordinator. The
poll() loop within close terminates if the coordinator is not known (as it
does now) or if the timeout expires. At the moment, that timeout is a
hard-coded 5 second timeout. The PR changes that to min(closeTimeout,
requestTimeout). So even if there are pending commits, the maximum wait
will be requestTimeout in the final poll() loop of close().

In addition to this, before the poll loop, there is a
maybeAutoCommitOffsetsSync(). At the moment, this does not have a timeout
and can wait indefinitely. The PR introduces a timeout for this commit
invoked from close(). The timeout is min(closeTimeout, requestTimeout).
Hence the maximum timeout of (2 * requestTimeout) for any close. Have I
missed something?

I had chosen Long.MAX_VALUE as default close timeout to be consistent with
Producer. But perhaps a lower timeout of 30 seconds is more meaningful for
Consumer since consumer typically has less to do. Even with (2 *
requestTimeout), the default would be 20 minutes, which is perhaps too high
anyway. I will update the KIP.


On Wed, Jan 4, 2017 at 3:16 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajini,
>
> Thanks for the KIP. I had a quick look at the patch and the impact doesn't
> seem too bad. Just wanted to clarify one point. This is from the KIP:
>
> The existing close() method without a timeout will attempt to close the
> > consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and
> > leave group requests are timed out after the request timeout, the upper
> > bound will be approximately 2*request.timeout.ms (around 10 minutes by
> > default).
>
>
> I don't think this is quite right. There could be one or more pending
> OffsetCommit requests (sent using commitAsync) that we may have to await.
> We could also be in the middle of a group rebalance. The other complication
> is what happens in the event of a request timeout. Usually the consumer
> will rediscover the coordinator. Would we do that as well in close() and
> retry any failed requests if there is time remaining, or would we just fail
> the remaining requests and return? In any case, it may not be so easy to
> set an upper bound on the default timeout.
>
> With that in mind, I'm wondering whether waiting indefinitely should be the
> default. In the case of the OffsetCommit before closing (when autocommit is
> enabled) or the LeaveGroup, it's more or less OK if these requests fail.
> Maybe we should consider them best effort (as is currently done) and wait a
> reasonable amount of time (say 30 seconds) for their completion. I'd rather
> have "nice" behavior out of the box and let users who want indefinite
> blocking use Long.MAX_VALUE themselves. What do you think?
>
> Thanks,
> Jason
>
> On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > I have added some more detail to the "Proposed Changes" section. Also
> > created a preliminary PR for the JIRA (
> > https://github.com/apache/kafka/pull/2285).
> >
> > I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> > individual requests during close (the KIP does not address timeouts in
> any
> > other code path) to ensure that *close()* always completes within a
> bounded
> > time even when timeout is not specified. This is similar to the producer
> > where requests are aborted after *request.timeout.ms
> > <http://request.timeout.ms>. *The PR contains unit and integration tests
> > for all the close scenarios I could think of (but there could be more).
> >
> >
> > On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > +1 on this idea as well.
> > >
> > > Streams has also added a similar feature itself partly because consumer
> > > does not support it directly (other part of the reason is that like
> > > brokers, streams also have some exception handling logic which could
> lead
> > > to deadlock with careless System.exit). For consumer itself I think the
> > > trickiness lies in the prefetching calls as well as commit / HB
> requests
> > > cleanup with the timeout, and I agree with Ewen that it's better to be
> > > merged in the early release cycle than a last minute merge.
> > >
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <
> rajinisivaram@gmail.com
> > >
> > > wrote:
> > >
> > > > Thank you for the reviews.
> > > >
> > > > @Becket @Ewen, Agree that making all blocking calls have a timeout
> will
> > > be
> > > > trickier and hence the scope of this KIP is limited to close().
> > > >
> > > > @Jay Yes, this should definitely go into release notes, will make
> sure
> > it
> > > > is added. I will add some integration tests with broker failures for
> > > > testing the timeout, but they cannot completely eliminate the risk
> of a
> > > > hang. Over time, hopefully system tests will help catch most issues.
> > > >
> > > >
> > > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io> wrote:
> > > >
> > > > > I think this is great. Sounds like one implication is that existing
> > > code
> > > > > that called close() and hit the timeout would now hang
> indefinitely.
> > We
> > > > saw
> > > > > this kind of thing a lot in automated testing scenarios where
> people
> > > > don't
> > > > > correctly sequence their shutdown of client and server. I think
> this
> > is
> > > > > okay, but might be good to include in the release notes.
> > > > >
> > > > > -jay
> > > > >
> > > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <
> rsivaram@pivotal.io
> > >
> > > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > I have just created KIP-102 to add a new close method for consumers
> > > with
> > > > a
> > > > >
> > > > >
> > > > > timeout parameter, making Consumer consistent with Producer:
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 102+-+Add+close+with+timeout+for+consumers
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Comments and suggestions are welcome.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Thank you...
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Rajini
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Rajini,

Thanks for the KIP. I had a quick look at the patch and the impact doesn't
seem too bad. Just wanted to clarify one point. This is from the KIP:

The existing close() method without a timeout will attempt to close the
> consumer gracefully with a timeout of Long.MAX_VALUE. Since commit and
> leave group requests are timed out after the request timeout, the upper
> bound will be approximately 2*request.timeout.ms (around 10 minutes by
> default).


I don't think this is quite right. There could be one or more pending
OffsetCommit requests (sent using commitAsync) that we may have to await.
We could also be in the middle of a group rebalance. The other complication
is what happens in the event of a request timeout. Usually the consumer
will rediscover the coordinator. Would we do that as well in close() and
retry any failed requests if there is time remaining, or would we just fail
the remaining requests and return? In any case, it may not be so easy to
set an upper bound on the default timeout.

With that in mind, I'm wondering whether waiting indefinitely should be the
default. In the case of the OffsetCommit before closing (when autocommit is
enabled) or the LeaveGroup, it's more or less OK if these requests fail.
Maybe we should consider them best effort (as is currently done) and wait a
reasonable amount of time (say 30 seconds) for their completion. I'd rather
have "nice" behavior out of the box and let users who want indefinite
blocking use Long.MAX_VALUE themselves. What do you think?

Thanks,
Jason

On Wed, Dec 21, 2016 at 4:39 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> I have added some more detail to the "Proposed Changes" section. Also
> created a preliminary PR for the JIRA (
> https://github.com/apache/kafka/pull/2285).
>
> I am using *request.timeout.ms <http://request.timeout.ms>* to bound
> individual requests during close (the KIP does not address timeouts in any
> other code path) to ensure that *close()* always completes within a bounded
> time even when timeout is not specified. This is similar to the producer
> where requests are aborted after *request.timeout.ms
> <http://request.timeout.ms>. *The PR contains unit and integration tests
> for all the close scenarios I could think of (but there could be more).
>
>
> On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > +1 on this idea as well.
> >
> > Streams has also added a similar feature itself partly because consumer
> > does not support it directly (other part of the reason is that like
> > brokers, streams also have some exception handling logic which could lead
> > to deadlock with careless System.exit). For consumer itself I think the
> > trickiness lies in the prefetching calls as well as commit / HB requests
> > cleanup with the timeout, and I agree with Ewen that it's better to be
> > merged in the early release cycle than a last minute merge.
> >
> >
> >
> > Guozhang
> >
> > On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <rajinisivaram@gmail.com
> >
> > wrote:
> >
> > > Thank you for the reviews.
> > >
> > > @Becket @Ewen, Agree that making all blocking calls have a timeout will
> > be
> > > trickier and hence the scope of this KIP is limited to close().
> > >
> > > @Jay Yes, this should definitely go into release notes, will make sure
> it
> > > is added. I will add some integration tests with broker failures for
> > > testing the timeout, but they cannot completely eliminate the risk of a
> > > hang. Over time, hopefully system tests will help catch most issues.
> > >
> > >
> > > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io> wrote:
> > >
> > > > I think this is great. Sounds like one implication is that existing
> > code
> > > > that called close() and hit the timeout would now hang indefinitely.
> We
> > > saw
> > > > this kind of thing a lot in automated testing scenarios where people
> > > don't
> > > > correctly sequence their shutdown of client and server. I think this
> is
> > > > okay, but might be good to include in the release notes.
> > > >
> > > > -jay
> > > >
> > > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rsivaram@pivotal.io
> >
> > > > wrote:
> > > >
> > > > Hi all,
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > I have just created KIP-102 to add a new close method for consumers
> > with
> > > a
> > > >
> > > >
> > > > timeout parameter, making Consumer consistent with Producer:
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 102+-+Add+close+with+timeout+for+consumers
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Comments and suggestions are welcome.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Thank you...
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Rajini
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <ra...@gmail.com>.
I have added some more detail to the "Proposed Changes" section. Also
created a preliminary PR for the JIRA (
https://github.com/apache/kafka/pull/2285).

I am using *request.timeout.ms <http://request.timeout.ms>* to bound
individual requests during close (the KIP does not address timeouts in any
other code path) to ensure that *close()* always completes within a bounded
time even when timeout is not specified. This is similar to the producer
where requests are aborted after *request.timeout.ms
<http://request.timeout.ms>. *The PR contains unit and integration tests
for all the close scenarios I could think of (but there could be more).


On Mon, Dec 19, 2016 at 10:32 PM, Guozhang Wang <wa...@gmail.com> wrote:

> +1 on this idea as well.
>
> Streams has also added a similar feature itself partly because consumer
> does not support it directly (other part of the reason is that like
> brokers, streams also have some exception handling logic which could lead
> to deadlock with careless System.exit). For consumer itself I think the
> trickiness lies in the prefetching calls as well as commit / HB requests
> cleanup with the timeout, and I agree with Ewen that it's better to be
> merged in the early release cycle than a last minute merge.
>
>
>
> Guozhang
>
> On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <ra...@gmail.com>
> wrote:
>
> > Thank you for the reviews.
> >
> > @Becket @Ewen, Agree that making all blocking calls have a timeout will
> be
> > trickier and hence the scope of this KIP is limited to close().
> >
> > @Jay Yes, this should definitely go into release notes, will make sure it
> > is added. I will add some integration tests with broker failures for
> > testing the timeout, but they cannot completely eliminate the risk of a
> > hang. Over time, hopefully system tests will help catch most issues.
> >
> >
> > On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io> wrote:
> >
> > > I think this is great. Sounds like one implication is that existing
> code
> > > that called close() and hit the timeout would now hang indefinitely. We
> > saw
> > > this kind of thing a lot in automated testing scenarios where people
> > don't
> > > correctly sequence their shutdown of client and server. I think this is
> > > okay, but might be good to include in the release notes.
> > >
> > > -jay
> > >
> > > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io>
> > > wrote:
> > >
> > > Hi all,
> > >
> > >
> > >
> > >
> > >
> > > I have just created KIP-102 to add a new close method for consumers
> with
> > a
> > >
> > >
> > > timeout parameter, making Consumer consistent with Producer:
> > >
> > >
> > >
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 102+-+Add+close+with+timeout+for+consumers
> > >
> > >
> > >
> > >
> > >
> > > Comments and suggestions are welcome.
> > >
> > >
> > >
> > >
> > >
> > > Thank you...
> > >
> > >
> > >
> > >
> > >
> > > Regards,
> > >
> > >
> > >
> > >
> > >
> > > Rajini
> > >
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Guozhang Wang <wa...@gmail.com>.
+1 on this idea as well.

Streams has also added a similar feature itself partly because consumer
does not support it directly (other part of the reason is that like
brokers, streams also have some exception handling logic which could lead
to deadlock with careless System.exit). For consumer itself I think the
trickiness lies in the prefetching calls as well as commit / HB requests
cleanup with the timeout, and I agree with Ewen that it's better to be
merged in the early release cycle than a last minute merge.



Guozhang

On Mon, Dec 19, 2016 at 4:18 AM, Rajini Sivaram <ra...@gmail.com>
wrote:

> Thank you for the reviews.
>
> @Becket @Ewen, Agree that making all blocking calls have a timeout will be
> trickier and hence the scope of this KIP is limited to close().
>
> @Jay Yes, this should definitely go into release notes, will make sure it
> is added. I will add some integration tests with broker failures for
> testing the timeout, but they cannot completely eliminate the risk of a
> hang. Over time, hopefully system tests will help catch most issues.
>
>
> On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io> wrote:
>
> > I think this is great. Sounds like one implication is that existing code
> > that called close() and hit the timeout would now hang indefinitely. We
> saw
> > this kind of thing a lot in automated testing scenarios where people
> don't
> > correctly sequence their shutdown of client and server. I think this is
> > okay, but might be good to include in the release notes.
> >
> > -jay
> >
> > On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io>
> > wrote:
> >
> > Hi all,
> >
> >
> >
> >
> >
> > I have just created KIP-102 to add a new close method for consumers with
> a
> >
> >
> > timeout parameter, making Consumer consistent with Producer:
> >
> >
> >
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 102+-+Add+close+with+timeout+for+consumers
> >
> >
> >
> >
> >
> > Comments and suggestions are welcome.
> >
> >
> >
> >
> >
> > Thank you...
> >
> >
> >
> >
> >
> > Regards,
> >
> >
> >
> >
> >
> > Rajini
> >
>
>
>
> --
> Regards,
>
> Rajini
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Rajini Sivaram <ra...@gmail.com>.
Thank you for the reviews.

@Becket @Ewen, Agree that making all blocking calls have a timeout will be
trickier and hence the scope of this KIP is limited to close().

@Jay Yes, this should definitely go into release notes, will make sure it
is added. I will add some integration tests with broker failures for
testing the timeout, but they cannot completely eliminate the risk of a
hang. Over time, hopefully system tests will help catch most issues.


On Sat, Dec 17, 2016 at 1:15 AM, Jay Kreps <ja...@confluent.io> wrote:

> I think this is great. Sounds like one implication is that existing code
> that called close() and hit the timeout would now hang indefinitely. We saw
> this kind of thing a lot in automated testing scenarios where people don't
> correctly sequence their shutdown of client and server. I think this is
> okay, but might be good to include in the release notes.
>
> -jay
>
> On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io>
> wrote:
>
> Hi all,
>
>
>
>
>
> I have just created KIP-102 to add a new close method for consumers with a
>
>
> timeout parameter, making Consumer consistent with Producer:
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 102+-+Add+close+with+timeout+for+consumers
>
>
>
>
>
> Comments and suggestions are welcome.
>
>
>
>
>
> Thank you...
>
>
>
>
>
> Regards,
>
>
>
>
>
> Rajini
>



-- 
Regards,

Rajini

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

Posted by Jay Kreps <ja...@confluent.io>.
I think this is great. Sounds like one implication is that existing code
that called close() and hit the timeout would now hang indefinitely. We saw
this kind of thing a lot in automated testing scenarios where people don't
correctly sequence their shutdown of client and server. I think this is
okay, but might be good to include in the release notes.

-jay

On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram <rs...@pivotal.io> wrote:

Hi all,





I have just created KIP-102 to add a new close method for consumers with a


timeout parameter, making Consumer consistent with Producer:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers





Comments and suggestions are welcome.





Thank you...





Regards,





Rajini