You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2017/11/09 11:13:43 UTC

[DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Hi,

I want to propose a new KIP to make Streams API more resilient to broker
disconnections.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-224%3A+Add+configuration+parameters+%60retries%60+and+%60retry.backoff.ms%60+to+Streams+API


-Matthias


Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by "Matthias J. Sax" <ma...@confluent.io>.
One more change: as parameter is called "retries", default value should
be zero (instead of one).

-Matthias

On 11/10/17 1:14 PM, Matthias J. Sax wrote:
> Thanks for the feedback. Typos fixed.
> 
> Damian explained already why we need the new strategy.
> 
> @Kamal: many users don't want to retry but want to fail the Kafka Stream
> instance in case of an error. All default parameters are chosen to
> follow this pattern (similar to consumer/producer/broker defaults). The
> KIP aims to allow users to reconfigure Kafka Streams to be resilient
> against errors. It's a users choice to change configs to get better
> resilience.
> 
> 
> Update:
> 
> While I was working on the PR, I realized that parameter
> "retry.backoff.ms" is already available in StreamsConfig. I updated the
> KIP accordingly.
> 
> I also discovered, that we have a hard coded number of retries for state
> locks -- I think, it would be worth to reuse both parameters for those,
> too. WDYT?
> 
> Here is the current PR: https://github.com/apache/kafka/pull/4206
> 
> 
> -Matthias
> 
> 
> 
> On 11/9/17 2:29 PM, Guozhang Wang wrote:
>> Damian,
>>
>> You are right! I was dreaming at the wrong class :)
>>
>> Guozhang
>>
>>
>> On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com> wrote:
>>
>>> Guozhang, i'm not sure i follow... Global stores aren't per task, they are
>>> per application instance and should be fully restored before the stream
>>> threads start processing. They don't go through a rebalance as it is manual
>>> assignment of all partitions in the topic.
>>>
>>> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
>>>
>>>> Instead of restoring the global store during registration, could we also
>>> do
>>>> this after the rebalance callback as in the main loop? By doing this we
>>> can
>>>> effectively swallow-and-retry-in-next-loop as we did for non-global
>>> stores.
>>>> Since global stores are per task not per thread, we would not process the
>>>> task after the global store is bootstrapped fully.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
>>>>
>>>>> Thanks for the KIP Matthias, +1 from me.
>>>>>
>>>>>
>>>>> -Bill
>>>>>
>>>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>>> lgtm
>>>>>>
>>>>>> bq. pass both parameter
>>>>>>
>>>>>> parameter should be in plural.
>>>>>> Same with 'two new configuration parameter'
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Thanks Matthias, LGTM
>>>>>>>
>>>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <matthias@confluent.io
>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I want to propose a new KIP to make Streams API more resilient to
>>>>>> broker
>>>>>>>> disconnections.
>>>>>>>>
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
>>>>>>> 60retry.backoff.ms%60+to+Streams+API
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>>
> 


Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Matthias. LGTM.

On Mon, Nov 13, 2017 at 4:18 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I had a look into the code and here are my observations:
>
> 1) We use a hard coded retry of 5 for both (note, MAX is a final static
> value in global state store).
>
> 2) We already have code in the main loop that handles LockExceptions.
>
> 3) For a global store there is actually no lock contention as only a
> single thread will hold this lock, and ownership is never transferred
> from one thread to another (in contrast to regular stores that might be
> moved between thread during rebalance).
>
>
> Thus, my conclusion is that we actually don't need any explicit retry
> for getting state locks. For "regular" stores we have indirect retries
> via the main-loop, and for global store not getting a lock is actually a
> fatal error that is not retryable.
>
>
> I updated the KIP and PR accordingly, ie, we don't need to consider
> newly added "retry" parameter for state directory locking.
>
> Let me know what you think about this. I'll start the vote thread in
> parallel.
>
>
> -Matthias
>
>
>
> On 11/10/17 4:56 PM, Guozhang Wang wrote:
> > For state lock retries, I realized that for both global state stores and
> > local state stores we are using hard-coded retries today (different
> values
> > though: 5 and MAX).
> >
> > Should we only use retries for global state, and use 0 for local state
> > store to fallback to the main loop?
> >
> >
> > Guozhang
> >
> >
> > On Fri, Nov 10, 2017 at 1:41 PM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> >> Overall I'd agree with re-using the parameter for state lock retries.
> >>
> >> Would there ever be a case where you'd need to have them be different
> >> values?
> >>
> >> On Fri, Nov 10, 2017 at 4:17 PM, Ted Yu <yu...@gmail.com> wrote:
> >>
> >>> bq. it would be worth to reuse both parameters for those
> >>>
> >>> I agree.
> >>>
> >>> On Fri, Nov 10, 2017 at 1:14 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> Thanks for the feedback. Typos fixed.
> >>>>
> >>>> Damian explained already why we need the new strategy.
> >>>>
> >>>> @Kamal: many users don't want to retry but want to fail the Kafka
> >> Stream
> >>>> instance in case of an error. All default parameters are chosen to
> >>>> follow this pattern (similar to consumer/producer/broker defaults).
> The
> >>>> KIP aims to allow users to reconfigure Kafka Streams to be resilient
> >>>> against errors. It's a users choice to change configs to get better
> >>>> resilience.
> >>>>
> >>>>
> >>>> Update:
> >>>>
> >>>> While I was working on the PR, I realized that parameter
> >>>> "retry.backoff.ms" is already available in StreamsConfig. I updated
> >> the
> >>>> KIP accordingly.
> >>>>
> >>>> I also discovered, that we have a hard coded number of retries for
> >> state
> >>>> locks -- I think, it would be worth to reuse both parameters for
> those,
> >>>> too. WDYT?
> >>>>
> >>>> Here is the current PR: https://github.com/apache/kafka/pull/4206
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 11/9/17 2:29 PM, Guozhang Wang wrote:
> >>>>> Damian,
> >>>>>
> >>>>> You are right! I was dreaming at the wrong class :)
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Guozhang, i'm not sure i follow... Global stores aren't per task,
> >> they
> >>>> are
> >>>>>> per application instance and should be fully restored before the
> >>> stream
> >>>>>> threads start processing. They don't go through a rebalance as it is
> >>>> manual
> >>>>>> assignment of all partitions in the topic.
> >>>>>>
> >>>>>> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>>> Instead of restoring the global store during registration, could we
> >>>> also
> >>>>>> do
> >>>>>>> this after the rebalance callback as in the main loop? By doing
> >> this
> >>> we
> >>>>>> can
> >>>>>>> effectively swallow-and-retry-in-next-loop as we did for non-global
> >>>>>> stores.
> >>>>>>> Since global stores are per task not per thread, we would not
> >> process
> >>>> the
> >>>>>>> task after the global store is bootstrapped fully.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the KIP Matthias, +1 from me.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Bill
> >>>>>>>>
> >>>>>>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com>
> >> wrote:
> >>>>>>>>
> >>>>>>>>> lgtm
> >>>>>>>>>
> >>>>>>>>> bq. pass both parameter
> >>>>>>>>>
> >>>>>>>>> parameter should be in plural.
> >>>>>>>>> Same with 'two new configuration parameter'
> >>>>>>>>>
> >>>>>>>>> Cheers
> >>>>>>>>>
> >>>>>>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <damian.guy@gmail.com
> >>>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Matthias, LGTM
> >>>>>>>>>>
> >>>>>>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <
> >>> matthias@confluent.io
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I want to propose a new KIP to make Streams API more resilient
> >> to
> >>>>>>>>> broker
> >>>>>>>>>>> disconnections.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
> >>>>>>>>>> 60retry.backoff.ms%60+to+Streams+API
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
> >
> >
>
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I had a look into the code and here are my observations:

1) We use a hard coded retry of 5 for both (note, MAX is a final static
value in global state store).

2) We already have code in the main loop that handles LockExceptions.

3) For a global store there is actually no lock contention as only a
single thread will hold this lock, and ownership is never transferred
from one thread to another (in contrast to regular stores that might be
moved between thread during rebalance).


Thus, my conclusion is that we actually don't need any explicit retry
for getting state locks. For "regular" stores we have indirect retries
via the main-loop, and for global store not getting a lock is actually a
fatal error that is not retryable.


I updated the KIP and PR accordingly, ie, we don't need to consider
newly added "retry" parameter for state directory locking.

Let me know what you think about this. I'll start the vote thread in
parallel.


-Matthias



On 11/10/17 4:56 PM, Guozhang Wang wrote:
> For state lock retries, I realized that for both global state stores and
> local state stores we are using hard-coded retries today (different values
> though: 5 and MAX).
> 
> Should we only use retries for global state, and use 0 for local state
> store to fallback to the main loop?
> 
> 
> Guozhang
> 
> 
> On Fri, Nov 10, 2017 at 1:41 PM, Bill Bejeck <bb...@gmail.com> wrote:
> 
>> Overall I'd agree with re-using the parameter for state lock retries.
>>
>> Would there ever be a case where you'd need to have them be different
>> values?
>>
>> On Fri, Nov 10, 2017 at 4:17 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> bq. it would be worth to reuse both parameters for those
>>>
>>> I agree.
>>>
>>> On Fri, Nov 10, 2017 at 1:14 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Thanks for the feedback. Typos fixed.
>>>>
>>>> Damian explained already why we need the new strategy.
>>>>
>>>> @Kamal: many users don't want to retry but want to fail the Kafka
>> Stream
>>>> instance in case of an error. All default parameters are chosen to
>>>> follow this pattern (similar to consumer/producer/broker defaults). The
>>>> KIP aims to allow users to reconfigure Kafka Streams to be resilient
>>>> against errors. It's a users choice to change configs to get better
>>>> resilience.
>>>>
>>>>
>>>> Update:
>>>>
>>>> While I was working on the PR, I realized that parameter
>>>> "retry.backoff.ms" is already available in StreamsConfig. I updated
>> the
>>>> KIP accordingly.
>>>>
>>>> I also discovered, that we have a hard coded number of retries for
>> state
>>>> locks -- I think, it would be worth to reuse both parameters for those,
>>>> too. WDYT?
>>>>
>>>> Here is the current PR: https://github.com/apache/kafka/pull/4206
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 11/9/17 2:29 PM, Guozhang Wang wrote:
>>>>> Damian,
>>>>>
>>>>> You are right! I was dreaming at the wrong class :)
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Guozhang, i'm not sure i follow... Global stores aren't per task,
>> they
>>>> are
>>>>>> per application instance and should be fully restored before the
>>> stream
>>>>>> threads start processing. They don't go through a rebalance as it is
>>>> manual
>>>>>> assignment of all partitions in the topic.
>>>>>>
>>>>>> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>>>>
>>>>>>> Instead of restoring the global store during registration, could we
>>>> also
>>>>>> do
>>>>>>> this after the rebalance callback as in the main loop? By doing
>> this
>>> we
>>>>>> can
>>>>>>> effectively swallow-and-retry-in-next-loop as we did for non-global
>>>>>> stores.
>>>>>>> Since global stores are per task not per thread, we would not
>> process
>>>> the
>>>>>>> task after the global store is bootstrapped fully.
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com>
>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP Matthias, +1 from me.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Bill
>>>>>>>>
>>>>>>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com>
>> wrote:
>>>>>>>>
>>>>>>>>> lgtm
>>>>>>>>>
>>>>>>>>> bq. pass both parameter
>>>>>>>>>
>>>>>>>>> parameter should be in plural.
>>>>>>>>> Same with 'two new configuration parameter'
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <damian.guy@gmail.com
>>>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Matthias, LGTM
>>>>>>>>>>
>>>>>>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <
>>> matthias@confluent.io
>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I want to propose a new KIP to make Streams API more resilient
>> to
>>>>>>>>> broker
>>>>>>>>>>> disconnections.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
>>>>>>>>>> 60retry.backoff.ms%60+to+Streams+API
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 
> 
> 


Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Guozhang Wang <wa...@gmail.com>.
For state lock retries, I realized that for both global state stores and
local state stores we are using hard-coded retries today (different values
though: 5 and MAX).

Should we only use retries for global state, and use 0 for local state
store to fallback to the main loop?


Guozhang


On Fri, Nov 10, 2017 at 1:41 PM, Bill Bejeck <bb...@gmail.com> wrote:

> Overall I'd agree with re-using the parameter for state lock retries.
>
> Would there ever be a case where you'd need to have them be different
> values?
>
> On Fri, Nov 10, 2017 at 4:17 PM, Ted Yu <yu...@gmail.com> wrote:
>
> > bq. it would be worth to reuse both parameters for those
> >
> > I agree.
> >
> > On Fri, Nov 10, 2017 at 1:14 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > Thanks for the feedback. Typos fixed.
> > >
> > > Damian explained already why we need the new strategy.
> > >
> > > @Kamal: many users don't want to retry but want to fail the Kafka
> Stream
> > > instance in case of an error. All default parameters are chosen to
> > > follow this pattern (similar to consumer/producer/broker defaults). The
> > > KIP aims to allow users to reconfigure Kafka Streams to be resilient
> > > against errors. It's a users choice to change configs to get better
> > > resilience.
> > >
> > >
> > > Update:
> > >
> > > While I was working on the PR, I realized that parameter
> > > "retry.backoff.ms" is already available in StreamsConfig. I updated
> the
> > > KIP accordingly.
> > >
> > > I also discovered, that we have a hard coded number of retries for
> state
> > > locks -- I think, it would be worth to reuse both parameters for those,
> > > too. WDYT?
> > >
> > > Here is the current PR: https://github.com/apache/kafka/pull/4206
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/9/17 2:29 PM, Guozhang Wang wrote:
> > > > Damian,
> > > >
> > > > You are right! I was dreaming at the wrong class :)
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > >
> > > >> Guozhang, i'm not sure i follow... Global stores aren't per task,
> they
> > > are
> > > >> per application instance and should be fully restored before the
> > stream
> > > >> threads start processing. They don't go through a rebalance as it is
> > > manual
> > > >> assignment of all partitions in the topic.
> > > >>
> > > >> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com>
> wrote:
> > > >>
> > > >>> Instead of restoring the global store during registration, could we
> > > also
> > > >> do
> > > >>> this after the rebalance callback as in the main loop? By doing
> this
> > we
> > > >> can
> > > >>> effectively swallow-and-retry-in-next-loop as we did for non-global
> > > >> stores.
> > > >>> Since global stores are per task not per thread, we would not
> process
> > > the
> > > >>> task after the global store is bootstrapped fully.
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Thanks for the KIP Matthias, +1 from me.
> > > >>>>
> > > >>>>
> > > >>>> -Bill
> > > >>>>
> > > >>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com>
> wrote:
> > > >>>>
> > > >>>>> lgtm
> > > >>>>>
> > > >>>>> bq. pass both parameter
> > > >>>>>
> > > >>>>> parameter should be in plural.
> > > >>>>> Same with 'two new configuration parameter'
> > > >>>>>
> > > >>>>> Cheers
> > > >>>>>
> > > >>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <damian.guy@gmail.com
> >
> > > >>> wrote:
> > > >>>>>
> > > >>>>>> Thanks Matthias, LGTM
> > > >>>>>>
> > > >>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <
> > matthias@confluent.io
> > > >>>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> I want to propose a new KIP to make Streams API more resilient
> to
> > > >>>>> broker
> > > >>>>>>> disconnections.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > > >>>>>> 60retry.backoff.ms%60+to+Streams+API
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> -Matthias
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > > >
> > > >
> > > >
> > >
> > >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Bill Bejeck <bb...@gmail.com>.
Overall I'd agree with re-using the parameter for state lock retries.

Would there ever be a case where you'd need to have them be different
values?

On Fri, Nov 10, 2017 at 4:17 PM, Ted Yu <yu...@gmail.com> wrote:

> bq. it would be worth to reuse both parameters for those
>
> I agree.
>
> On Fri, Nov 10, 2017 at 1:14 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Thanks for the feedback. Typos fixed.
> >
> > Damian explained already why we need the new strategy.
> >
> > @Kamal: many users don't want to retry but want to fail the Kafka Stream
> > instance in case of an error. All default parameters are chosen to
> > follow this pattern (similar to consumer/producer/broker defaults). The
> > KIP aims to allow users to reconfigure Kafka Streams to be resilient
> > against errors. It's a users choice to change configs to get better
> > resilience.
> >
> >
> > Update:
> >
> > While I was working on the PR, I realized that parameter
> > "retry.backoff.ms" is already available in StreamsConfig. I updated the
> > KIP accordingly.
> >
> > I also discovered, that we have a hard coded number of retries for state
> > locks -- I think, it would be worth to reuse both parameters for those,
> > too. WDYT?
> >
> > Here is the current PR: https://github.com/apache/kafka/pull/4206
> >
> >
> > -Matthias
> >
> >
> >
> > On 11/9/17 2:29 PM, Guozhang Wang wrote:
> > > Damian,
> > >
> > > You are right! I was dreaming at the wrong class :)
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > >> Guozhang, i'm not sure i follow... Global stores aren't per task, they
> > are
> > >> per application instance and should be fully restored before the
> stream
> > >> threads start processing. They don't go through a rebalance as it is
> > manual
> > >> assignment of all partitions in the topic.
> > >>
> > >> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
> > >>
> > >>> Instead of restoring the global store during registration, could we
> > also
> > >> do
> > >>> this after the rebalance callback as in the main loop? By doing this
> we
> > >> can
> > >>> effectively swallow-and-retry-in-next-loop as we did for non-global
> > >> stores.
> > >>> Since global stores are per task not per thread, we would not process
> > the
> > >>> task after the global store is bootstrapped fully.
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com>
> wrote:
> > >>>
> > >>>> Thanks for the KIP Matthias, +1 from me.
> > >>>>
> > >>>>
> > >>>> -Bill
> > >>>>
> > >>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
> > >>>>
> > >>>>> lgtm
> > >>>>>
> > >>>>> bq. pass both parameter
> > >>>>>
> > >>>>> parameter should be in plural.
> > >>>>> Same with 'two new configuration parameter'
> > >>>>>
> > >>>>> Cheers
> > >>>>>
> > >>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
> > >>> wrote:
> > >>>>>
> > >>>>>> Thanks Matthias, LGTM
> > >>>>>>
> > >>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <
> matthias@confluent.io
> > >>>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> I want to propose a new KIP to make Streams API more resilient to
> > >>>>> broker
> > >>>>>>> disconnections.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > >>>>>> 60retry.backoff.ms%60+to+Streams+API
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Ted Yu <yu...@gmail.com>.
bq. it would be worth to reuse both parameters for those

I agree.

On Fri, Nov 10, 2017 at 1:14 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the feedback. Typos fixed.
>
> Damian explained already why we need the new strategy.
>
> @Kamal: many users don't want to retry but want to fail the Kafka Stream
> instance in case of an error. All default parameters are chosen to
> follow this pattern (similar to consumer/producer/broker defaults). The
> KIP aims to allow users to reconfigure Kafka Streams to be resilient
> against errors. It's a users choice to change configs to get better
> resilience.
>
>
> Update:
>
> While I was working on the PR, I realized that parameter
> "retry.backoff.ms" is already available in StreamsConfig. I updated the
> KIP accordingly.
>
> I also discovered, that we have a hard coded number of retries for state
> locks -- I think, it would be worth to reuse both parameters for those,
> too. WDYT?
>
> Here is the current PR: https://github.com/apache/kafka/pull/4206
>
>
> -Matthias
>
>
>
> On 11/9/17 2:29 PM, Guozhang Wang wrote:
> > Damian,
> >
> > You are right! I was dreaming at the wrong class :)
> >
> > Guozhang
> >
> >
> > On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> >> Guozhang, i'm not sure i follow... Global stores aren't per task, they
> are
> >> per application instance and should be fully restored before the stream
> >> threads start processing. They don't go through a rebalance as it is
> manual
> >> assignment of all partitions in the topic.
> >>
> >> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
> >>
> >>> Instead of restoring the global store during registration, could we
> also
> >> do
> >>> this after the rebalance callback as in the main loop? By doing this we
> >> can
> >>> effectively swallow-and-retry-in-next-loop as we did for non-global
> >> stores.
> >>> Since global stores are per task not per thread, we would not process
> the
> >>> task after the global store is bootstrapped fully.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >>>
> >>>> Thanks for the KIP Matthias, +1 from me.
> >>>>
> >>>>
> >>>> -Bill
> >>>>
> >>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
> >>>>
> >>>>> lgtm
> >>>>>
> >>>>> bq. pass both parameter
> >>>>>
> >>>>> parameter should be in plural.
> >>>>> Same with 'two new configuration parameter'
> >>>>>
> >>>>> Cheers
> >>>>>
> >>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> Thanks Matthias, LGTM
> >>>>>>
> >>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <matthias@confluent.io
> >>>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I want to propose a new KIP to make Streams API more resilient to
> >>>>> broker
> >>>>>>> disconnections.
> >>>>>>>
> >>>>>>>
> >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
> >>>>>> 60retry.backoff.ms%60+to+Streams+API
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
> >
> >
>
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the feedback. Typos fixed.

Damian explained already why we need the new strategy.

@Kamal: many users don't want to retry but want to fail the Kafka Stream
instance in case of an error. All default parameters are chosen to
follow this pattern (similar to consumer/producer/broker defaults). The
KIP aims to allow users to reconfigure Kafka Streams to be resilient
against errors. It's a users choice to change configs to get better
resilience.


Update:

While I was working on the PR, I realized that parameter
"retry.backoff.ms" is already available in StreamsConfig. I updated the
KIP accordingly.

I also discovered, that we have a hard coded number of retries for state
locks -- I think, it would be worth to reuse both parameters for those,
too. WDYT?

Here is the current PR: https://github.com/apache/kafka/pull/4206


-Matthias



On 11/9/17 2:29 PM, Guozhang Wang wrote:
> Damian,
> 
> You are right! I was dreaming at the wrong class :)
> 
> Guozhang
> 
> 
> On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com> wrote:
> 
>> Guozhang, i'm not sure i follow... Global stores aren't per task, they are
>> per application instance and should be fully restored before the stream
>> threads start processing. They don't go through a rebalance as it is manual
>> assignment of all partitions in the topic.
>>
>> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Instead of restoring the global store during registration, could we also
>> do
>>> this after the rebalance callback as in the main loop? By doing this we
>> can
>>> effectively swallow-and-retry-in-next-loop as we did for non-global
>> stores.
>>> Since global stores are per task not per thread, we would not process the
>>> task after the global store is bootstrapped fully.
>>>
>>>
>>> Guozhang
>>>
>>> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
>>>
>>>> Thanks for the KIP Matthias, +1 from me.
>>>>
>>>>
>>>> -Bill
>>>>
>>>> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> lgtm
>>>>>
>>>>> bq. pass both parameter
>>>>>
>>>>> parameter should be in plural.
>>>>> Same with 'two new configuration parameter'
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
>>> wrote:
>>>>>
>>>>>> Thanks Matthias, LGTM
>>>>>>
>>>>>> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <matthias@confluent.io
>>>
>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I want to propose a new KIP to make Streams API more resilient to
>>>>> broker
>>>>>>> disconnections.
>>>>>>>
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 224%3A+Add+configuration+parameters+%60retries%60+and+%
>>>>>> 60retry.backoff.ms%60+to+Streams+API
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
> 


Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Guozhang Wang <wa...@gmail.com>.
Damian,

You are right! I was dreaming at the wrong class :)

Guozhang


On Thu, Nov 9, 2017 at 11:27 AM, Damian Guy <da...@gmail.com> wrote:

> Guozhang, i'm not sure i follow... Global stores aren't per task, they are
> per application instance and should be fully restored before the stream
> threads start processing. They don't go through a rebalance as it is manual
> assignment of all partitions in the topic.
>
> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
>
> > Instead of restoring the global store during registration, could we also
> do
> > this after the rebalance callback as in the main loop? By doing this we
> can
> > effectively swallow-and-retry-in-next-loop as we did for non-global
> stores.
> > Since global stores are per task not per thread, we would not process the
> > task after the global store is bootstrapped fully.
> >
> >
> > Guozhang
> >
> > On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> > > Thanks for the KIP Matthias, +1 from me.
> > >
> > >
> > > -Bill
> > >
> > > On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
> > >
> > > > lgtm
> > > >
> > > > bq. pass both parameter
> > > >
> > > > parameter should be in plural.
> > > > Same with 'two new configuration parameter'
> > > >
> > > > Cheers
> > > >
> > > > On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Matthias, LGTM
> > > > >
> > > > > On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <matthias@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I want to propose a new KIP to make Streams API more resilient to
> > > > broker
> > > > > > disconnections.
> > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > > > > 60retry.backoff.ms%60+to+Streams+API
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Kamal <ka...@gmail.com>.
Matthias,

    Could you describe how the current 'fail-fast' strategy (default: 1
retry) is useful since it's kills the stream instance ?
Does the plan is to keep infinite retry or users need to configure it ?

On Fri, Nov 10, 2017 at 12:57 AM, Damian Guy <da...@gmail.com> wrote:

> Guozhang, i'm not sure i follow... Global stores aren't per task, they are
> per application instance and should be fully restored before the stream
> threads start processing. They don't go through a rebalance as it is manual
> assignment of all partitions in the topic.
>
> On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:
>
> > Instead of restoring the global store during registration, could we also
> do
> > this after the rebalance callback as in the main loop? By doing this we
> can
> > effectively swallow-and-retry-in-next-loop as we did for non-global
> stores.
> > Since global stores are per task not per thread, we would not process the
> > task after the global store is bootstrapped fully.
> >
> >
> > Guozhang
> >
> > On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> > > Thanks for the KIP Matthias, +1 from me.
> > >
> > >
> > > -Bill
> > >
> > > On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
> > >
> > > > lgtm
> > > >
> > > > bq. pass both parameter
> > > >
> > > > parameter should be in plural.
> > > > Same with 'two new configuration parameter'
> > > >
> > > > Cheers
> > > >
> > > > On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks Matthias, LGTM
> > > > >
> > > > > On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <matthias@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I want to propose a new KIP to make Streams API more resilient to
> > > > broker
> > > > > > disconnections.
> > > > > >
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > > > > 60retry.backoff.ms%60+to+Streams+API
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Damian Guy <da...@gmail.com>.
Guozhang, i'm not sure i follow... Global stores aren't per task, they are
per application instance and should be fully restored before the stream
threads start processing. They don't go through a rebalance as it is manual
assignment of all partitions in the topic.

On Thu, 9 Nov 2017 at 17:43 Guozhang Wang <wa...@gmail.com> wrote:

> Instead of restoring the global store during registration, could we also do
> this after the rebalance callback as in the main loop? By doing this we can
> effectively swallow-and-retry-in-next-loop as we did for non-global stores.
> Since global stores are per task not per thread, we would not process the
> task after the global store is bootstrapped fully.
>
>
> Guozhang
>
> On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Thanks for the KIP Matthias, +1 from me.
> >
> >
> > -Bill
> >
> > On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
> >
> > > lgtm
> > >
> > > bq. pass both parameter
> > >
> > > parameter should be in plural.
> > > Same with 'two new configuration parameter'
> > >
> > > Cheers
> > >
> > > On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > Thanks Matthias, LGTM
> > > >
> > > > On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I want to propose a new KIP to make Streams API more resilient to
> > > broker
> > > > > disconnections.
> > > > >
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > > > 60retry.backoff.ms%60+to+Streams+API
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Guozhang Wang <wa...@gmail.com>.
Instead of restoring the global store during registration, could we also do
this after the rebalance callback as in the main loop? By doing this we can
effectively swallow-and-retry-in-next-loop as we did for non-global stores.
Since global stores are per task not per thread, we would not process the
task after the global store is bootstrapped fully.


Guozhang

On Thu, Nov 9, 2017 at 7:08 AM, Bill Bejeck <bb...@gmail.com> wrote:

> Thanks for the KIP Matthias, +1 from me.
>
>
> -Bill
>
> On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:
>
> > lgtm
> >
> > bq. pass both parameter
> >
> > parameter should be in plural.
> > Same with 'two new configuration parameter'
> >
> > Cheers
> >
> > On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com> wrote:
> >
> > > Thanks Matthias, LGTM
> > >
> > > On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I want to propose a new KIP to make Streams API more resilient to
> > broker
> > > > disconnections.
> > > >
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > > 60retry.backoff.ms%60+to+Streams+API
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Bill Bejeck <bb...@gmail.com>.
Thanks for the KIP Matthias, +1 from me.


-Bill

On Thu, Nov 9, 2017 at 8:40 AM, Ted Yu <yu...@gmail.com> wrote:

> lgtm
>
> bq. pass both parameter
>
> parameter should be in plural.
> Same with 'two new configuration parameter'
>
> Cheers
>
> On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Thanks Matthias, LGTM
> >
> > On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <ma...@confluent.io>
> wrote:
> >
> > > Hi,
> > >
> > > I want to propose a new KIP to make Streams API more resilient to
> broker
> > > disconnections.
> > >
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 224%3A+Add+configuration+parameters+%60retries%60+and+%
> > 60retry.backoff.ms%60+to+Streams+API
> > >
> > >
> > > -Matthias
> > >
> > >
> >
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Ted Yu <yu...@gmail.com>.
lgtm

bq. pass both parameter

parameter should be in plural.
Same with 'two new configuration parameter'

Cheers

On Thu, Nov 9, 2017 at 4:20 AM, Damian Guy <da...@gmail.com> wrote:

> Thanks Matthias, LGTM
>
> On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <ma...@confluent.io> wrote:
>
> > Hi,
> >
> > I want to propose a new KIP to make Streams API more resilient to broker
> > disconnections.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 224%3A+Add+configuration+parameters+%60retries%60+and+%
> 60retry.backoff.ms%60+to+Streams+API
> >
> >
> > -Matthias
> >
> >
>

Re: [DISCUSS] KIP-224: Add configuration parameters `retries` and `retry.backoff.ms` to Streams API

Posted by Damian Guy <da...@gmail.com>.
Thanks Matthias, LGTM

On Thu, 9 Nov 2017 at 11:13 Matthias J. Sax <ma...@confluent.io> wrote:

> Hi,
>
> I want to propose a new KIP to make Streams API more resilient to broker
> disconnections.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-224%3A+Add+configuration+parameters+%60retries%60+and+%60retry.backoff.ms%60+to+Streams+API
>
>
> -Matthias
>
>