You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <mj...@apache.org> on 2020/09/24 00:50:58 UTC

Re: [VOTE] KIP-572: Improve timeouts and retires in Kafka Streams

Sorry for the late reply Jason, and thanks for calling it out.

After couple if direct discussion with Jason, Colin, Ismael, and John, I
agree that we should keep `retries` config for both producer and admin
client.

I updated the KIP accordingly and opened a PR to revert the deprecation:
https://github.com/apache/kafka/pull/9333


-Matthias

On 7/27/20 12:45 PM, Jason Gustafson wrote:
> Hi Matthias,
> 
> Sorry for jumping in so late here. I am trying to understand why it was
> necessary to deprecate `retries` in the producer. One of the use cases that
> I see in practice is setting `retries` to 0. This allows applications to
> control the retry semantics themselves. For example, I have seen this in
> flume. As far as I can tell, once `retries` is gone, we will not have a way
> to do the same thing. The best we can suggest to users is to enable
> idempotence so that any retries will not cause duplicates. My concern is
> that this is going to slow client upgrades with little benefit in return.
> 
> Thanks,
> Jason
> 
> 
> 
> On Mon, Jul 20, 2020 at 2:40 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> While working on the PR, we realized that the command line tool
>>
>>   bin/kafka-console-producer.sh
>>
>> has a flag `--message-send-max-retries` to set the producer's `retries`
>> config. We also need to deprecate this flag.
>>
>> I updated the KIP accordingly. Please let us know if there are any
>> concerns to this minor change to the KIP.
>>
>> Thanks.
>>
>>
>> -Matthias
>>
>> On 6/10/20 11:16 AM, Matthias J. Sax wrote:
>>> Thanks!
>>>
>>> +1 (binding) from myself.
>>>
>>>
>>> I am closing the vote as accepted with 3 binding and 3 non-binding votes.
>>>
>>> binding:
>>>  - John
>>>  - Guozhang
>>>  - Matthias
>>>
>>> non-binding:
>>>  - Sophie
>>>  - Boyang
>>>  - Bruno
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 6/4/20 5:26 PM, Matthias J. Sax wrote:
>>>> Guozhang,
>>>>
>>>> what you propose makes sense, but this seems to get into implementation
>>>> detail territory already?
>>>>
>>>> Thus, if there are nor further change requests to the KIP wiki page
>>>> itself, I would like to proceed with the VOTE.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/20/20 12:30 PM, Guozhang Wang wrote:
>>>>> Thanks Matthias,
>>>>>
>>>>> I agree with you on all the bullet points above. Regarding the
>> admin-client
>>>>> outer-loop retries inside partition assignor, I think we should treat
>> error
>>>>> codes differently from those two blocking calls:
>>>>>
>>>>> Describe-topic:
>>>>> * unknown-topic (3): add this topic to the to-be-created topic list.
>>>>> * leader-not-available (5): do not try to create, retry in the outer
>> loop.
>>>>> * request-timeout: break the current loop and retry in the outer loop.
>>>>> * others: fatal error.
>>>>>
>>>>> Create-topic:
>>>>> * topic-already-exists: retry in the outer loop to validate the
>>>>> num.partitions match expectation.
>>>>> * request-timeout: break the current loop and retry in the outer loop.
>>>>> * others: fatal error.
>>>>>
>>>>> And in the outer-loop, I think we can have a global timer for the whole
>>>>> "assign()" function, not only for the internal-topic-manager, and the
>> timer
>>>>> can be hard-coded with, e.g. half of the rebalance.timeout to get rid
>> of
>>>>> the `retries`; if we cannot complete the assignment before the timeout
>> runs
>>>>> out, we can return just the partial assignment (e.g. if there are two
>>>>> tasks, but we can only get the topic metadata for one of them, then
>> just do
>>>>> the assignment for that one only) while encoding in the error-code
>> field to
>>>>> request for another rebalance.
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>>>>
>>>>>> No worries Guozhang, any feedback is always very welcome! My reply is
>>>>>> going to be a little longer... Sorry.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> 1) There are some inconsistent statements in the proposal regarding
>> what
>>>>>> to
>>>>>>> deprecated:
>>>>>>
>>>>>> The proposal of the KIP is to deprecate `retries` for producer, admin,
>>>>>> and Streams. Maybe the confusion is about the dependency of those
>>>>>> settings within Streams and that we handle the deprecation somewhat
>>>>>> different for plain clients vs Streams:
>>>>>>
>>>>>> For plain producer/admin the default `retries` is set to MAX_VALUE.
>> The
>>>>>> config will be deprecated but still be respected.
>>>>>>
>>>>>> For Streams, the default `retries` is set to zero, however, this
>> default
>>>>>> retry does _not_ affect the embedded producer/admin clients -- both
>>>>>> clients stay on their own default of MAX_VALUES.
>>>>>>
>>>>>> Currently, this introduces the issue, that if a user wants to increase
>>>>>> Streams retries, she might by accident reduce the embedded client
>>>>>> retries, too. To avoid this issue, she would need to set
>>>>>>
>>>>>> retries=100
>>>>>> producer.retires=MAX_VALUE
>>>>>> admin.retries=MAX_VALUE
>>>>>>
>>>>>> This KIP will fix this issue only in the long term though, ie, when
>>>>>> `retries` is finally removed. Short term, using `retries` in
>>>>>> StreamsConfig would still affect the embedded clients, but Streams, as
>>>>>> well as both client would log a WARN message. This preserves backward
>>>>>> compatibility.
>>>>>>
>>>>>> Withing Streams `retries` is ignored and the new `task.timeout.ms` is
>>>>>> used instead. This increase the default resilience of Kafka Streams
>>>>>> itself. We could also achieve this by still respecting `retries` and
>> to
>>>>>> change it's default value. However, because we deprecate `retries` it
>>>>>> seems better to just ignore it and switch to the new config directly.
>>>>>>
>>>>>> I updated the KIPs with some more details.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> 2) We should also document the related behavior change in
>>>>>> PartitionAssignor
>>>>>>> that uses AdminClient.
>>>>>>
>>>>>> This is actually a good point. Originally, I looked into this only
>>>>>> briefly, but it raised an interesting question on how to handle it.
>>>>>>
>>>>>> Note that `TimeoutExceptions` are currently not handled in this retry
>>>>>> loop. Also note that the default retries value for other errors would
>> be
>>>>>> MAX_VALUE be default (inherited from `AdminClient#retries` as
>> mentioned
>>>>>> already by Guozhang).
>>>>>>
>>>>>> Applying the new `task.timeout.ms` config does not seem to be
>>>>>> appropriate because the AdminClient is used during a rebalance in the
>>>>>> leader. We could introduce a new config just for this case, but it
>> seems
>>>>>> to be a little bit too much. Furthermore, the group-coordinator
>> applies
>>>>>> a timeout on the leader anyway: if the assignment is not computed
>> within
>>>>>> the timeout, the leader is removed from the group and another
>> rebalance
>>>>>> is triggered.
>>>>>>
>>>>>> Overall, we make multiple admin client calls and thus we should keep
>>>>>> some retry logic and not just rely on the admin client internal
>> retries,
>>>>>> as those would fall short to retry different calls interleaved. We
>> could
>>>>>> just retry infinitely and rely on the group coordinator to remove the
>>>>>> leader eventually. However, this does not seem to be ideal because the
>>>>>> removed leader might be stuck forever.
>>>>>>
>>>>>> The question though is: if topic metadata cannot be obtained or
>> internal
>>>>>> topics cannot be created, what should we do? We can't compute an
>>>>>> assignment anyway. We have already an rebalance error code to shut
>> down
>>>>>> all instances for this case. Maybe we could break the retry loop
>> before
>>>>>> the leader is kicked out of the group and send this error code? This
>> way
>>>>>> we don't need a new config, but piggy-back on the existing timeout to
>>>>>> compute the assignment. To be conservative, we could use a 50%
>> threshold?
>>>>>>
>>>>>>
>>>>>>
>>>>>>> BTW as I mentioned in the previous statement, today throwing an
>> exception
>>>>>>> that kills one thread but not the whole instance is still an issue
>> for
>>>>>>> monitoring purposes, but I suppose this is not going to be in this
>> KIP
>>>>>> but
>>>>>>> addressed by another KIP, right?
>>>>>>
>>>>>> Correct. This issue if out-of-scope.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> for consumer, if it gets a
>>>>>>> TimeoutException polling records, would start timing all tasks since
>> that
>>>>>>> single consumer would affect all tasks?
>>>>>>
>>>>>> Consumer#poll() would never throw a `TimeoutException` and thus
>>>>>> `task.timeout.ms` does not apply.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> For other blocking calls like
>>>>>>> `endOffsets()` etc, they are usually also issued on behalf of a
>> batch of
>>>>>>> tasks, so if that gets timeout exception should we start ticking all
>> the
>>>>>>> corresponding tasks as well? Maybe worth clarifying a bit more in the
>>>>>> wiki.
>>>>>>
>>>>>> Good point. I agree that the timer should tick for all affected
>> tasks. I
>>>>>> clarified in the KIP.
>>>>>>
>>>>>>
>>>>>>
>>>>>> About KAFKA-6520:
>>>>>>
>>>>>> There is already KIP-457 and I am not sure this KIP should subsume it.
>>>>>> It somewhat feels orthogonal. I am also not 100% sure if KIP-572
>>>>>> actually helps much, because a thread could be disconnected to the
>>>>>> brokers without throwing any timeout exception: if all tasks are
>> RUNNING
>>>>>> and just polling for new data, but no progress is made because of a
>>>>>> network issue, `poll()` would just return no data but not through.
>>>>>>
>>>>>>
>>>>>>
>>>>>> @Bruno
>>>>>>
>>>>>>> Wouldn't it be better to specify
>>>>>>> task.timeout.ms to -1 if no retry should be done
>>>>>>
>>>>>> Interesting idea. Personally I find `-1` confusing. And it seems
>>>>>> intuitive to me that `0` implies no retries (this seems to be in
>>>>>> alignment to other APIs).
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 5/18/20 9:53 AM, Guozhang Wang wrote:
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>> Sorry for flooding the thread, but with this KIP I feel the design
>> scope
>>>>>> of
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6520 can be simplified
>> a lot
>>>>>>> and may it the design can be just piggy-backed as part of this KIP,
>> wdyt?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 18, 2020 at 9:47 AM Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Matthias,
>>>>>>>>
>>>>>>>> Just to add one more meta comment: for consumer, if it gets a
>>>>>>>> TimeoutException polling records, would start timing all tasks since
>>>>>> that
>>>>>>>> single consumer would affect all tasks? For other blocking calls
>> like
>>>>>>>> `endOffsets()` etc, they are usually also issued on behalf of a
>> batch of
>>>>>>>> tasks, so if that gets timeout exception should we start ticking
>> all the
>>>>>>>> corresponding tasks as well? Maybe worth clarifying a bit more in
>> the
>>>>>> wiki.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Mon, May 18, 2020 at 12:26 AM Bruno Cadonna <br...@confluent.io>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> I am +1 (non-binding) on the KIP.
>>>>>>>>>
>>>>>>>>> Just one final remark: Wouldn't it be better to specify
>>>>>>>>> task.timeout.ms to -1 if no retry should be done? IMO it would
>> make
>>>>>>>>> the config more intuitive because 0 would not have two possible
>>>>>>>>> meanings (i.e. try once and never try) anymore.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Bruno
>>>>>>>>>
>>>>>>>>> On Sat, May 16, 2020 at 7:51 PM Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hello Matthias,
>>>>>>>>>>
>>>>>>>>>> Thanks for the updated KIP, overall I'm +1 on this proposal. Some
>>>>>> minor
>>>>>>>>>> comments (I know gmail mixed that again for me so I'm leaving it
>> as a
>>>>>>>>> combo
>>>>>>>>>> for both DISCUSS and VOTE :)
>>>>>>>>>>
>>>>>>>>>> 1) There are some inconsistent statements in the proposal
>> regarding
>>>>>>>>> what to
>>>>>>>>>> deprecated: at the beginning it says "We propose to deprecate the
>>>>>>>>> retries
>>>>>>>>>> configuration parameter for the producer and admin client" but
>> later
>>>>>> in
>>>>>>>>>> compatibility we say "Producer and admin client behavior does not
>>>>>>>>> change;
>>>>>>>>>> both still respect retries config." My understanding is that we
>> will
>>>>>>>>> only
>>>>>>>>>> deprecate the StreamsConfig#retries while not touch on
>>>>>>>>>> ProducerConfig/AdminClientConfig#retries, AND we will always
>> override
>>>>>>>>> the
>>>>>>>>>> embedded producer / admin retries config to infinity so that we
>> never
>>>>>>>>> rely
>>>>>>>>>> on those configs but always bounded by the timeout config. Is that
>>>>>>>>>> right, if yes could you clarify in the doc?
>>>>>>>>>>
>>>>>>>>>> 2) We should also document the related behavior change in
>>>>>>>>> PartitionAssignor
>>>>>>>>>> that uses AdminClient. More specifically, the admin client's
>> retries
>>>>>>>>> config
>>>>>>>>>> is piggy-backed inside InternalTopicManager as an outer-loop retry
>>>>>>>>> logic in
>>>>>>>>>> addition to AdminClient's own inner retry loop. There are some
>>>>>> existing
>>>>>>>>>> issues like KAFKA-9999 / 10006 that Sophie and Boyang has been
>> working
>>>>>>>>> on.
>>>>>>>>>> I exchanged some ideas with them, and generally we should
>> consider if
>>>>>>>>> the
>>>>>>>>>> outer-loop of InternalTopicManager should just be removed and if
>> we
>>>>>> got
>>>>>>>>>> TimeoutException we should just trigger another rebalance etc.
>>>>>>>>>>
>>>>>>>>>> BTW as I mentioned in the previous statement, today throwing an
>>>>>>>>> exception
>>>>>>>>>> that kills one thread but not the whole instance is still an
>> issue for
>>>>>>>>>> monitoring purposes, but I suppose this is not going to be in
>> this KIP
>>>>>>>>> but
>>>>>>>>>> addressed by another KIP, right?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2020 at 1:14 PM Boyang Chen <
>>>>>> reluctanthero104@gmail.com
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Good, good.
>>>>>>>>>>>
>>>>>>>>>>> Read through the discussions, the KIP looks good to me, +1
>>>>>>>>> (non-binding)
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2020 at 11:51 AM Sophie Blee-Goldman <
>>>>>>>>> sophie@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Called out!
>>>>>>>>>>>>
>>>>>>>>>>>> Seems like gmail struggles with [...] prefixed subjects. You'd
>>>>>>>>> think they
>>>>>>>>>>>> would adapt
>>>>>>>>>>>> all their practices to conform to the Apache Kafka mailing list
>>>>>>>>>>> standards,
>>>>>>>>>>>> but no!
>>>>>>>>>>>>
>>>>>>>>>>>> +1 (non-binding) by the way
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, May 15, 2020 at 11:46 AM John Roesler <
>> vvcephei@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Boyang,
>>>>>>>>>>>>>
>>>>>>>>>>>>> It is a separate thread, and you have just revealed yourself
>> as a
>>>>>>>>> gmail
>>>>>>>>>>>>> user ;)
>>>>>>>>>>>>>
>>>>>>>>>>>>> (Gmail sometimes conflates vote and discuss threads for no
>>>>>>>>> apparent
>>>>>>>>>>>> reason
>>>>>>>>>>>>> )
>>>>>>>>>>>>>
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, May 15, 2020, at 13:39, Boyang Chen wrote:
>>>>>>>>>>>>>> Hey Matthias, should this be on a separate VOTE thread?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, May 15, 2020 at 11:38 AM John Roesler <
>>>>>>>>> vvcephei@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks, Matthias!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I’m +1 (binding)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, May 15, 2020, at 11:55, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I would like to start the vote on KIP-572:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Attachments:
>>>>>>>>>>>>>>>> * signature.asc
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>