You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/11/25 00:43:45 UTC

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/9/19 3:51 PM, Dongjin Lee wrote:
> Hi Matthias,
> 
> Have you thought about this issue?
> 
> Thanks,
> Dongjin
> 
> On Wed, Jun 19, 2019 at 5:07 AM Dongjin Lee <do...@apache.org> wrote:
> 
>> Hello.
>>
>> I just uploaded the draft implementation of the three proposed
>> alternatives.
>>
>> - Type A: define a close timeout constant -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a
>> - Type B: Provide a new configuration option, 'close.wait.ms' -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b
>> - Type C: Extend KafkaStreams constructor to support a close timeout
>> parameter -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c
>>
>> As you can see in the branches, Type B and C are a little bit more
>> complicated than A, since it provides an option to control the timeout to
>> close AdminClient and Producer. To provide that functionality, B and C
>> share a refactoring commit, which replaces KafkaStreams#create into
>> KafkaStreams.builder. It is why they are consist of two commits.
>>
>> Please have a look you are free.
>>
>> Thanks,
>> Dongjin
>>
>> On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <do...@apache.org> wrote:
>>
>>> I just updated the KIP document reflecting what I found about the clients
>>> API inconsistency and Matthias's comments. Since it is now obvious that
>>> modifying the default close timeout for the client is not feasible, the
>>> updated document proposes totally different alternatives. (see Rejected
>>> Alternatives section)
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>>
>>> Please have a look you are free. All kinds of feedbacks are welcomed!
>>>
>>> Thanks,
>>> Dongjin
>>>
>>> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Thanks for digging into the back ground.
>>>>
>>>> I think it would be good to get feedback from people who work on
>>>> clients, too.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 5/19/19 12:58 PM, Dongjin Lee wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> I investigated the inconsistencies between `close` semantics of
>>>> `Producer`,
>>>>> `Consumer`, and `AdminClient`. And I found that this inconsistency was
>>>>> intended. Here are the details:
>>>>>
>>>>> The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
>>>>> introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
>>>> are
>>>>> two differences between `Consumer` and `Producer`;
>>>>>
>>>>> 1. `Consumer`s don't have large requests.
>>>>> 2. `Consumer#close` is affected by consumer coordinator, whose close
>>>>> operation is affected by `request.timeout.ms`.
>>>>>
>>>>> By the above reasons, Consumer's default timeout was set a little bit
>>>>> different.[^3] (It is done by Rajini.)
>>>>>
>>>>> At the initial proposal, I proposed to change the default timeout
>>>> value of
>>>>> `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
>>>>> However, since it is now clear that the current implementation is
>>>> totally
>>>>> reasonable, *it seems like changing the approach into just providing a
>>>>> close timeout into the clients used by KafkaStreams is a more suitable
>>>>> one.*[^4]
>>>>> This approach has the following advantages:
>>>>>
>>>>> 1. The problem described in KAFKA-7996 now resolved, since Producer
>>>> doesn't
>>>>> hang up while its `close` operation.
>>>>> 2. We don't have to change the semantics of `Producer#close`,
>>>>> `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
>>>>> kinds of changes are hard for users to reason about.
>>>>>
>>>>> How do you think?
>>>>>
>>>>> Thanks,
>>>>> Dongjin
>>>>>
>>>>> [^1]:
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
>>>>> [^2]: "The existing close() method without a timeout will attempt to
>>>> close
>>>>> the consumer gracefully with a default timeout of 30 seconds. This is
>>>>> different from the producer default of Long.MAX_VALUE since consumers
>>>> don't
>>>>> have large requests."
>>>>> [^3]: 'Rejected Alternatives' section explains it.
>>>>> [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
>>>> timeout
>>>>> is 60 seconds (KIP-198):
>>>> https://github.com/apache/kafka/pull/3927/files
>>>>>
>>>>> On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the KIP.
>>>>>>
>>>>>> Overall, I agree with the sentiment of the KIP. The current semantics
>>>> of
>>>>>> `KafkaStreams#close(timeout)` are not well defined. Also the general
>>>>>> client inconsistencies are annoying.
>>>>>>
>>>>>>
>>>>>>> This KIP make any change on public interfaces; however, it makes a
>>>>>> subtle change to the existing API's semantics. If this KIP is
>>>> accepted,
>>>>>> documenting these semantics with as much detail as possible may much
>>>> better.
>>>>>>
>>>>>> I am not sure if I would call this change "subtle". It might actually
>>>> be
>>>>>> rather big impact and hence I am also wondering about backward
>>>>>> compatibility (details below). Overall, I am not sure if documenting
>>>> the
>>>>>> change would be sufficient.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Change the default close timeout of Producer, AdminClient into more
>>>>>> reasonable one, not Long.MAX_VALUE.
>>>>>>
>>>>>> Can you be more specific than "more reasonable", and propose a
>>>> concrete
>>>>>> value? What about backward compatibility? Assume an application wants
>>>> to
>>>>>> block forever by default: with this change, it's required to rewrite
>>>>>> code to keep the intended semantics. Hence, the change does not seems
>>>> to
>>>>>> be backward compatible. Also note, that a config change would not be
>>>>>> sufficient, but an actual code change would be required.
>>>>>>
>>>>>> Also, why not go the other direction and default
>>>> `KafkaConsumer#close()`
>>>>>> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close()
>>>> also
>>>>>> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>>>>>> similar backward compatibility concern raises.
>>>>>>
>>>>>> Making close() blocking by default seems not un-reasonable per-se. Can
>>>>>> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>>>>>>
>>>>>>
>>>>>>> If succeeded, simply return; if not, close remaining resources with
>>>>>> default close timeout.
>>>>>>
>>>>>> Why do you want to apply the default timeout as fallback? This would
>>>>>> violate the user intention, too, and thus, might result in a situation
>>>>>> that is not much better than the current one.
>>>>>>
>>>>>>
>>>>>> For KafkaStreams, if there are multiple StreamThreads, would the full
>>>>>> timeout be passed into each thread as all of them could shut down in
>>>>>> parallel? Or would the timeout be divided over all threads?
>>>>>>
>>>>>> What about the case when there is a different number of client? For
>>>>>> example, with EOS enabled, there are multiple Producers that need to
>>>> be
>>>>>> closed, however, the user might not even be aware of the increased
>>>>>> number of producers (or not know how many there actually are).
>>>>>>
>>>>>>
>>>>>> It seems to be hard for users to reason about those dependencies.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>>>>>>> Hi dev,
>>>>>>>
>>>>>>> I would like to start the discussion of KIP-459: Improve
>>>>>> KafkaStreams#close
>>>>>>> <
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>>>>>> .
>>>>>>> This proposal is originated from the issue reported via community
>>>> slack,
>>>>>>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
>>>> short,
>>>>>>> this KIP proposes to resolve this problem by improving existing API's
>>>>>>> semantics, not adding any public API changes.
>>>>>>>
>>>>>>> Please have a look when you are free. All opinions will be highly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dongjin
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>> --
>>> *Dongjin Lee*
>>>
>>> *A hitchhiker in the mathematical world.*
>>> *github:  <http://goog_969573159/>github.com/dongjinleekr
>>> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
>>> <https://speakerdeck.com/dongjin>*
>>>
>>
>>
>> --
>> *Dongjin Lee*
>>
>> *A hitchhiker in the mathematical world.*
>> *github:  <http://goog_969573159/>github.com/dongjinleekr
>> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
>> <https://speakerdeck.com/dongjin>*
>>
> 
>