You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dongjin Lee <do...@apache.org> on 2019/05/19 10:58:15 UTC

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Hi Matthias,

I investigated the inconsistencies between `close` semantics of `Producer`,
`Consumer`, and `AdminClient`. And I found that this inconsistency was
intended. Here are the details:

The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are
two differences between `Consumer` and `Producer`;

1. `Consumer`s don't have large requests.
2. `Consumer#close` is affected by consumer coordinator, whose close
operation is affected by `request.timeout.ms`.

By the above reasons, Consumer's default timeout was set a little bit
different.[^3] (It is done by Rajini.)

At the initial proposal, I proposed to change the default timeout value of
`[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
However, since it is now clear that the current implementation is totally
reasonable, *it seems like changing the approach into just providing a
close timeout into the clients used by KafkaStreams is a more suitable
one.*[^4]
This approach has the following advantages:

1. The problem described in KAFKA-7996 now resolved, since Producer doesn't
hang up while its `close` operation.
2. We don't have to change the semantics of `Producer#close`,
`AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
kinds of changes are hard for users to reason about.

How do you think?

Thanks,
Dongjin

[^1]:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
[^2]: "The existing close() method without a timeout will attempt to close
the consumer gracefully with a default timeout of 30 seconds. This is
different from the producer default of Long.MAX_VALUE since consumers don't
have large requests."
[^3]: 'Rejected Alternatives' section explains it.
[^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout
is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files

On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for the KIP.
>
> Overall, I agree with the sentiment of the KIP. The current semantics of
> `KafkaStreams#close(timeout)` are not well defined. Also the general
> client inconsistencies are annoying.
>
>
> > This KIP make any change on public interfaces; however, it makes a
> subtle change to the existing API's semantics. If this KIP is accepted,
> documenting these semantics with as much detail as possible may much better.
>
> I am not sure if I would call this change "subtle". It might actually be
> rather big impact and hence I am also wondering about backward
> compatibility (details below). Overall, I am not sure if documenting the
> change would be sufficient.
>
>
>
> > Change the default close timeout of Producer, AdminClient into more
> reasonable one, not Long.MAX_VALUE.
>
> Can you be more specific than "more reasonable", and propose a concrete
> value? What about backward compatibility? Assume an application wants to
> block forever by default: with this change, it's required to rewrite
> code to keep the intended semantics. Hence, the change does not seems to
> be backward compatible. Also note, that a config change would not be
> sufficient, but an actual code change would be required.
>
> Also, why not go the other direction and default `KafkaConsumer#close()`
> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
> similar backward compatibility concern raises.
>
> Making close() blocking by default seems not un-reasonable per-se. Can
> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>
>
> > If succeeded, simply return; if not, close remaining resources with
> default close timeout.
>
> Why do you want to apply the default timeout as fallback? This would
> violate the user intention, too, and thus, might result in a situation
> that is not much better than the current one.
>
>
> For KafkaStreams, if there are multiple StreamThreads, would the full
> timeout be passed into each thread as all of them could shut down in
> parallel? Or would the timeout be divided over all threads?
>
> What about the case when there is a different number of client? For
> example, with EOS enabled, there are multiple Producers that need to be
> closed, however, the user might not even be aware of the increased
> number of producers (or not know how many there actually are).
>
>
> It seems to be hard for users to reason about those dependencies.
>
>
> -Matthias
>
>
> On 4/23/19 6:13 PM, Dongjin Lee wrote:
> > Hi dev,
> >
> > I would like to start the discussion of KIP-459: Improve
> KafkaStreams#close
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
> >.
> > This proposal is originated from the issue reported via community slack,
> > KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short,
> > this KIP proposes to resolve this problem by improving existing API's
> > semantics, not adding any public API changes.
> >
> > Please have a look when you are free. All opinions will be highly
> > appreciated.
> >
> > Thanks,
> > Dongjin
> >
>
>

-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/9/19 3:51 PM, Dongjin Lee wrote:
> Hi Matthias,
> 
> Have you thought about this issue?
> 
> Thanks,
> Dongjin
> 
> On Wed, Jun 19, 2019 at 5:07 AM Dongjin Lee <do...@apache.org> wrote:
> 
>> Hello.
>>
>> I just uploaded the draft implementation of the three proposed
>> alternatives.
>>
>> - Type A: define a close timeout constant -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a
>> - Type B: Provide a new configuration option, 'close.wait.ms' -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b
>> - Type C: Extend KafkaStreams constructor to support a close timeout
>> parameter -
>> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c
>>
>> As you can see in the branches, Type B and C are a little bit more
>> complicated than A, since it provides an option to control the timeout to
>> close AdminClient and Producer. To provide that functionality, B and C
>> share a refactoring commit, which replaces KafkaStreams#create into
>> KafkaStreams.builder. It is why they are consist of two commits.
>>
>> Please have a look you are free.
>>
>> Thanks,
>> Dongjin
>>
>> On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <do...@apache.org> wrote:
>>
>>> I just updated the KIP document reflecting what I found about the clients
>>> API inconsistency and Matthias's comments. Since it is now obvious that
>>> modifying the default close timeout for the client is not feasible, the
>>> updated document proposes totally different alternatives. (see Rejected
>>> Alternatives section)
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>>
>>> Please have a look you are free. All kinds of feedbacks are welcomed!
>>>
>>> Thanks,
>>> Dongjin
>>>
>>> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Thanks for digging into the back ground.
>>>>
>>>> I think it would be good to get feedback from people who work on
>>>> clients, too.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 5/19/19 12:58 PM, Dongjin Lee wrote:
>>>>> Hi Matthias,
>>>>>
>>>>> I investigated the inconsistencies between `close` semantics of
>>>> `Producer`,
>>>>> `Consumer`, and `AdminClient`. And I found that this inconsistency was
>>>>> intended. Here are the details:
>>>>>
>>>>> The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
>>>>> introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
>>>> are
>>>>> two differences between `Consumer` and `Producer`;
>>>>>
>>>>> 1. `Consumer`s don't have large requests.
>>>>> 2. `Consumer#close` is affected by consumer coordinator, whose close
>>>>> operation is affected by `request.timeout.ms`.
>>>>>
>>>>> By the above reasons, Consumer's default timeout was set a little bit
>>>>> different.[^3] (It is done by Rajini.)
>>>>>
>>>>> At the initial proposal, I proposed to change the default timeout
>>>> value of
>>>>> `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
>>>>> However, since it is now clear that the current implementation is
>>>> totally
>>>>> reasonable, *it seems like changing the approach into just providing a
>>>>> close timeout into the clients used by KafkaStreams is a more suitable
>>>>> one.*[^4]
>>>>> This approach has the following advantages:
>>>>>
>>>>> 1. The problem described in KAFKA-7996 now resolved, since Producer
>>>> doesn't
>>>>> hang up while its `close` operation.
>>>>> 2. We don't have to change the semantics of `Producer#close`,
>>>>> `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
>>>>> kinds of changes are hard for users to reason about.
>>>>>
>>>>> How do you think?
>>>>>
>>>>> Thanks,
>>>>> Dongjin
>>>>>
>>>>> [^1]:
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
>>>>> [^2]: "The existing close() method without a timeout will attempt to
>>>> close
>>>>> the consumer gracefully with a default timeout of 30 seconds. This is
>>>>> different from the producer default of Long.MAX_VALUE since consumers
>>>> don't
>>>>> have large requests."
>>>>> [^3]: 'Rejected Alternatives' section explains it.
>>>>> [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
>>>> timeout
>>>>> is 60 seconds (KIP-198):
>>>> https://github.com/apache/kafka/pull/3927/files
>>>>>
>>>>> On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the KIP.
>>>>>>
>>>>>> Overall, I agree with the sentiment of the KIP. The current semantics
>>>> of
>>>>>> `KafkaStreams#close(timeout)` are not well defined. Also the general
>>>>>> client inconsistencies are annoying.
>>>>>>
>>>>>>
>>>>>>> This KIP make any change on public interfaces; however, it makes a
>>>>>> subtle change to the existing API's semantics. If this KIP is
>>>> accepted,
>>>>>> documenting these semantics with as much detail as possible may much
>>>> better.
>>>>>>
>>>>>> I am not sure if I would call this change "subtle". It might actually
>>>> be
>>>>>> rather big impact and hence I am also wondering about backward
>>>>>> compatibility (details below). Overall, I am not sure if documenting
>>>> the
>>>>>> change would be sufficient.
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Change the default close timeout of Producer, AdminClient into more
>>>>>> reasonable one, not Long.MAX_VALUE.
>>>>>>
>>>>>> Can you be more specific than "more reasonable", and propose a
>>>> concrete
>>>>>> value? What about backward compatibility? Assume an application wants
>>>> to
>>>>>> block forever by default: with this change, it's required to rewrite
>>>>>> code to keep the intended semantics. Hence, the change does not seems
>>>> to
>>>>>> be backward compatible. Also note, that a config change would not be
>>>>>> sufficient, but an actual code change would be required.
>>>>>>
>>>>>> Also, why not go the other direction and default
>>>> `KafkaConsumer#close()`
>>>>>> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close()
>>>> also
>>>>>> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>>>>>> similar backward compatibility concern raises.
>>>>>>
>>>>>> Making close() blocking by default seems not un-reasonable per-se. Can
>>>>>> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>>>>>>
>>>>>>
>>>>>>> If succeeded, simply return; if not, close remaining resources with
>>>>>> default close timeout.
>>>>>>
>>>>>> Why do you want to apply the default timeout as fallback? This would
>>>>>> violate the user intention, too, and thus, might result in a situation
>>>>>> that is not much better than the current one.
>>>>>>
>>>>>>
>>>>>> For KafkaStreams, if there are multiple StreamThreads, would the full
>>>>>> timeout be passed into each thread as all of them could shut down in
>>>>>> parallel? Or would the timeout be divided over all threads?
>>>>>>
>>>>>> What about the case when there is a different number of client? For
>>>>>> example, with EOS enabled, there are multiple Producers that need to
>>>> be
>>>>>> closed, however, the user might not even be aware of the increased
>>>>>> number of producers (or not know how many there actually are).
>>>>>>
>>>>>>
>>>>>> It seems to be hard for users to reason about those dependencies.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>>>>>>> Hi dev,
>>>>>>>
>>>>>>> I would like to start the discussion of KIP-459: Improve
>>>>>> KafkaStreams#close
>>>>>>> <
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>>>>>> .
>>>>>>> This proposal is originated from the issue reported via community
>>>> slack,
>>>>>>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
>>>> short,
>>>>>>> this KIP proposes to resolve this problem by improving existing API's
>>>>>>> semantics, not adding any public API changes.
>>>>>>>
>>>>>>> Please have a look when you are free. All opinions will be highly
>>>>>>> appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dongjin
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>> --
>>> *Dongjin Lee*
>>>
>>> *A hitchhiker in the mathematical world.*
>>> *github:  <http://goog_969573159/>github.com/dongjinleekr
>>> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
>>> <https://speakerdeck.com/dongjin>*
>>>
>>
>>
>> --
>> *Dongjin Lee*
>>
>> *A hitchhiker in the mathematical world.*
>> *github:  <http://goog_969573159/>github.com/dongjinleekr
>> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
>> <https://speakerdeck.com/dongjin>*
>>
> 
> 


Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Posted by Dongjin Lee <do...@apache.org>.
Hi Matthias,

Have you thought about this issue?

Thanks,
Dongjin

On Wed, Jun 19, 2019 at 5:07 AM Dongjin Lee <do...@apache.org> wrote:

> Hello.
>
> I just uploaded the draft implementation of the three proposed
> alternatives.
>
> - Type A: define a close timeout constant -
> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a
> - Type B: Provide a new configuration option, 'close.wait.ms' -
> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b
> - Type C: Extend KafkaStreams constructor to support a close timeout
> parameter -
> https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c
>
> As you can see in the branches, Type B and C are a little bit more
> complicated than A, since it provides an option to control the timeout to
> close AdminClient and Producer. To provide that functionality, B and C
> share a refactoring commit, which replaces KafkaStreams#create into
> KafkaStreams.builder. It is why they are consist of two commits.
>
> Please have a look you are free.
>
> Thanks,
> Dongjin
>
> On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <do...@apache.org> wrote:
>
>> I just updated the KIP document reflecting what I found about the clients
>> API inconsistency and Matthias's comments. Since it is now obvious that
>> modifying the default close timeout for the client is not feasible, the
>> updated document proposes totally different alternatives. (see Rejected
>> Alternatives section)
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>
>> Please have a look you are free. All kinds of feedbacks are welcomed!
>>
>> Thanks,
>> Dongjin
>>
>> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Thanks for digging into the back ground.
>>>
>>> I think it would be good to get feedback from people who work on
>>> clients, too.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/19/19 12:58 PM, Dongjin Lee wrote:
>>> > Hi Matthias,
>>> >
>>> > I investigated the inconsistencies between `close` semantics of
>>> `Producer`,
>>> > `Consumer`, and `AdminClient`. And I found that this inconsistency was
>>> > intended. Here are the details:
>>> >
>>> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
>>> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
>>> are
>>> > two differences between `Consumer` and `Producer`;
>>> >
>>> > 1. `Consumer`s don't have large requests.
>>> > 2. `Consumer#close` is affected by consumer coordinator, whose close
>>> > operation is affected by `request.timeout.ms`.
>>> >
>>> > By the above reasons, Consumer's default timeout was set a little bit
>>> > different.[^3] (It is done by Rajini.)
>>> >
>>> > At the initial proposal, I proposed to change the default timeout
>>> value of
>>> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
>>> > However, since it is now clear that the current implementation is
>>> totally
>>> > reasonable, *it seems like changing the approach into just providing a
>>> > close timeout into the clients used by KafkaStreams is a more suitable
>>> > one.*[^4]
>>> > This approach has the following advantages:
>>> >
>>> > 1. The problem described in KAFKA-7996 now resolved, since Producer
>>> doesn't
>>> > hang up while its `close` operation.
>>> > 2. We don't have to change the semantics of `Producer#close`,
>>> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
>>> > kinds of changes are hard for users to reason about.
>>> >
>>> > How do you think?
>>> >
>>> > Thanks,
>>> > Dongjin
>>> >
>>> > [^1]:
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
>>> > [^2]: "The existing close() method without a timeout will attempt to
>>> close
>>> > the consumer gracefully with a default timeout of 30 seconds. This is
>>> > different from the producer default of Long.MAX_VALUE since consumers
>>> don't
>>> > have large requests."
>>> > [^3]: 'Rejected Alternatives' section explains it.
>>> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
>>> timeout
>>> > is 60 seconds (KIP-198):
>>> https://github.com/apache/kafka/pull/3927/files
>>> >
>>> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <matthias@confluent.io
>>> >
>>> > wrote:
>>> >
>>> >> Thanks for the KIP.
>>> >>
>>> >> Overall, I agree with the sentiment of the KIP. The current semantics
>>> of
>>> >> `KafkaStreams#close(timeout)` are not well defined. Also the general
>>> >> client inconsistencies are annoying.
>>> >>
>>> >>
>>> >>> This KIP make any change on public interfaces; however, it makes a
>>> >> subtle change to the existing API's semantics. If this KIP is
>>> accepted,
>>> >> documenting these semantics with as much detail as possible may much
>>> better.
>>> >>
>>> >> I am not sure if I would call this change "subtle". It might actually
>>> be
>>> >> rather big impact and hence I am also wondering about backward
>>> >> compatibility (details below). Overall, I am not sure if documenting
>>> the
>>> >> change would be sufficient.
>>> >>
>>> >>
>>> >>
>>> >>> Change the default close timeout of Producer, AdminClient into more
>>> >> reasonable one, not Long.MAX_VALUE.
>>> >>
>>> >> Can you be more specific than "more reasonable", and propose a
>>> concrete
>>> >> value? What about backward compatibility? Assume an application wants
>>> to
>>> >> block forever by default: with this change, it's required to rewrite
>>> >> code to keep the intended semantics. Hence, the change does not seems
>>> to
>>> >> be backward compatible. Also note, that a config change would not be
>>> >> sufficient, but an actual code change would be required.
>>> >>
>>> >> Also, why not go the other direction and default
>>> `KafkaConsumer#close()`
>>> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close()
>>> also
>>> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>>> >> similar backward compatibility concern raises.
>>> >>
>>> >> Making close() blocking by default seems not un-reasonable per-se. Can
>>> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>>> >>
>>> >>
>>> >>> If succeeded, simply return; if not, close remaining resources with
>>> >> default close timeout.
>>> >>
>>> >> Why do you want to apply the default timeout as fallback? This would
>>> >> violate the user intention, too, and thus, might result in a situation
>>> >> that is not much better than the current one.
>>> >>
>>> >>
>>> >> For KafkaStreams, if there are multiple StreamThreads, would the full
>>> >> timeout be passed into each thread as all of them could shut down in
>>> >> parallel? Or would the timeout be divided over all threads?
>>> >>
>>> >> What about the case when there is a different number of client? For
>>> >> example, with EOS enabled, there are multiple Producers that need to
>>> be
>>> >> closed, however, the user might not even be aware of the increased
>>> >> number of producers (or not know how many there actually are).
>>> >>
>>> >>
>>> >> It seems to be hard for users to reason about those dependencies.
>>> >>
>>> >>
>>> >> -Matthias
>>> >>
>>> >>
>>> >> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>>> >>> Hi dev,
>>> >>>
>>> >>> I would like to start the discussion of KIP-459: Improve
>>> >> KafkaStreams#close
>>> >>> <
>>> >>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>> >>> .
>>> >>> This proposal is originated from the issue reported via community
>>> slack,
>>> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
>>> short,
>>> >>> this KIP proposes to resolve this problem by improving existing API's
>>> >>> semantics, not adding any public API changes.
>>> >>>
>>> >>> Please have a look when you are free. All opinions will be highly
>>> >>> appreciated.
>>> >>>
>>> >>> Thanks,
>>> >>> Dongjin
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>>
>>
>> --
>> *Dongjin Lee*
>>
>> *A hitchhiker in the mathematical world.*
>> *github:  <http://goog_969573159/>github.com/dongjinleekr
>> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
>> <https://speakerdeck.com/dongjin>*
>>
>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
> <https://speakerdeck.com/dongjin>*
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Posted by Dongjin Lee <do...@apache.org>.
Hello.

I just uploaded the draft implementation of the three proposed alternatives.

- Type A: define a close timeout constant -
https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-a
- Type B: Provide a new configuration option, 'close.wait.ms' -
https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-b
- Type C: Extend KafkaStreams constructor to support a close timeout
parameter - https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-7996-c

As you can see in the branches, Type B and C are a little bit more
complicated than A, since it provides an option to control the timeout to
close AdminClient and Producer. To provide that functionality, B and C
share a refactoring commit, which replaces KafkaStreams#create into
KafkaStreams.builder. It is why they are consist of two commits.

Please have a look you are free.

Thanks,
Dongjin

On Thu, May 30, 2019 at 12:26 PM Dongjin Lee <do...@apache.org> wrote:

> I just updated the KIP document reflecting what I found about the clients
> API inconsistency and Matthias's comments. Since it is now obvious that
> modifying the default close timeout for the client is not feasible, the
> updated document proposes totally different alternatives. (see Rejected
> Alternatives section)
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>
> Please have a look you are free. All kinds of feedbacks are welcomed!
>
> Thanks,
> Dongjin
>
> On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Thanks for digging into the back ground.
>>
>> I think it would be good to get feedback from people who work on
>> clients, too.
>>
>>
>> -Matthias
>>
>>
>> On 5/19/19 12:58 PM, Dongjin Lee wrote:
>> > Hi Matthias,
>> >
>> > I investigated the inconsistencies between `close` semantics of
>> `Producer`,
>> > `Consumer`, and `AdminClient`. And I found that this inconsistency was
>> > intended. Here are the details:
>> >
>> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
>> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
>> are
>> > two differences between `Consumer` and `Producer`;
>> >
>> > 1. `Consumer`s don't have large requests.
>> > 2. `Consumer#close` is affected by consumer coordinator, whose close
>> > operation is affected by `request.timeout.ms`.
>> >
>> > By the above reasons, Consumer's default timeout was set a little bit
>> > different.[^3] (It is done by Rajini.)
>> >
>> > At the initial proposal, I proposed to change the default timeout value
>> of
>> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
>> > However, since it is now clear that the current implementation is
>> totally
>> > reasonable, *it seems like changing the approach into just providing a
>> > close timeout into the clients used by KafkaStreams is a more suitable
>> > one.*[^4]
>> > This approach has the following advantages:
>> >
>> > 1. The problem described in KAFKA-7996 now resolved, since Producer
>> doesn't
>> > hang up while its `close` operation.
>> > 2. We don't have to change the semantics of `Producer#close`,
>> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
>> > kinds of changes are hard for users to reason about.
>> >
>> > How do you think?
>> >
>> > Thanks,
>> > Dongjin
>> >
>> > [^1]:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
>> > [^2]: "The existing close() method without a timeout will attempt to
>> close
>> > the consumer gracefully with a default timeout of 30 seconds. This is
>> > different from the producer default of Long.MAX_VALUE since consumers
>> don't
>> > have large requests."
>> > [^3]: 'Rejected Alternatives' section explains it.
>> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
>> timeout
>> > is 60 seconds (KIP-198):
>> https://github.com/apache/kafka/pull/3927/files
>> >
>> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <ma...@confluent.io>
>> > wrote:
>> >
>> >> Thanks for the KIP.
>> >>
>> >> Overall, I agree with the sentiment of the KIP. The current semantics
>> of
>> >> `KafkaStreams#close(timeout)` are not well defined. Also the general
>> >> client inconsistencies are annoying.
>> >>
>> >>
>> >>> This KIP make any change on public interfaces; however, it makes a
>> >> subtle change to the existing API's semantics. If this KIP is accepted,
>> >> documenting these semantics with as much detail as possible may much
>> better.
>> >>
>> >> I am not sure if I would call this change "subtle". It might actually
>> be
>> >> rather big impact and hence I am also wondering about backward
>> >> compatibility (details below). Overall, I am not sure if documenting
>> the
>> >> change would be sufficient.
>> >>
>> >>
>> >>
>> >>> Change the default close timeout of Producer, AdminClient into more
>> >> reasonable one, not Long.MAX_VALUE.
>> >>
>> >> Can you be more specific than "more reasonable", and propose a concrete
>> >> value? What about backward compatibility? Assume an application wants
>> to
>> >> block forever by default: with this change, it's required to rewrite
>> >> code to keep the intended semantics. Hence, the change does not seems
>> to
>> >> be backward compatible. Also note, that a config change would not be
>> >> sufficient, but an actual code change would be required.
>> >>
>> >> Also, why not go the other direction and default
>> `KafkaConsumer#close()`
>> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
>> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>> >> similar backward compatibility concern raises.
>> >>
>> >> Making close() blocking by default seems not un-reasonable per-se. Can
>> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>> >>
>> >>
>> >>> If succeeded, simply return; if not, close remaining resources with
>> >> default close timeout.
>> >>
>> >> Why do you want to apply the default timeout as fallback? This would
>> >> violate the user intention, too, and thus, might result in a situation
>> >> that is not much better than the current one.
>> >>
>> >>
>> >> For KafkaStreams, if there are multiple StreamThreads, would the full
>> >> timeout be passed into each thread as all of them could shut down in
>> >> parallel? Or would the timeout be divided over all threads?
>> >>
>> >> What about the case when there is a different number of client? For
>> >> example, with EOS enabled, there are multiple Producers that need to be
>> >> closed, however, the user might not even be aware of the increased
>> >> number of producers (or not know how many there actually are).
>> >>
>> >>
>> >> It seems to be hard for users to reason about those dependencies.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>> >>> Hi dev,
>> >>>
>> >>> I would like to start the discussion of KIP-459: Improve
>> >> KafkaStreams#close
>> >>> <
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>> >>> .
>> >>> This proposal is originated from the issue reported via community
>> slack,
>> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
>> short,
>> >>> this KIP proposes to resolve this problem by improving existing API's
>> >>> semantics, not adding any public API changes.
>> >>>
>> >>> Please have a look when you are free. All opinions will be highly
>> >>> appreciated.
>> >>>
>> >>> Thanks,
>> >>> Dongjin
>> >>>
>> >>
>> >>
>> >
>>
>>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
> <https://speakerdeck.com/dongjin>*
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Posted by Dongjin Lee <do...@apache.org>.
I just updated the KIP document reflecting what I found about the clients
API inconsistency and Matthias's comments. Since it is now obvious that
modifying the default close timeout for the client is not feasible, the
updated document proposes totally different alternatives. (see Rejected
Alternatives section)

https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close

Please have a look you are free. All kinds of feedbacks are welcomed!

Thanks,
Dongjin

On Fri, May 24, 2019 at 1:07 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for digging into the back ground.
>
> I think it would be good to get feedback from people who work on
> clients, too.
>
>
> -Matthias
>
>
> On 5/19/19 12:58 PM, Dongjin Lee wrote:
> > Hi Matthias,
> >
> > I investigated the inconsistencies between `close` semantics of
> `Producer`,
> > `Consumer`, and `AdminClient`. And I found that this inconsistency was
> > intended. Here are the details:
> >
> > The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
> > introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there
> are
> > two differences between `Consumer` and `Producer`;
> >
> > 1. `Consumer`s don't have large requests.
> > 2. `Consumer#close` is affected by consumer coordinator, whose close
> > operation is affected by `request.timeout.ms`.
> >
> > By the above reasons, Consumer's default timeout was set a little bit
> > different.[^3] (It is done by Rajini.)
> >
> > At the initial proposal, I proposed to change the default timeout value
> of
> > `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
> > However, since it is now clear that the current implementation is totally
> > reasonable, *it seems like changing the approach into just providing a
> > close timeout into the clients used by KafkaStreams is a more suitable
> > one.*[^4]
> > This approach has the following advantages:
> >
> > 1. The problem described in KAFKA-7996 now resolved, since Producer
> doesn't
> > hang up while its `close` operation.
> > 2. We don't have to change the semantics of `Producer#close`,
> > `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
> > kinds of changes are hard for users to reason about.
> >
> > How do you think?
> >
> > Thanks,
> > Dongjin
> >
> > [^1]:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
> > [^2]: "The existing close() method without a timeout will attempt to
> close
> > the consumer gracefully with a default timeout of 30 seconds. This is
> > different from the producer default of Long.MAX_VALUE since consumers
> don't
> > have large requests."
> > [^3]: 'Rejected Alternatives' section explains it.
> > [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close
> timeout
> > is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files
> >
> > On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Thanks for the KIP.
> >>
> >> Overall, I agree with the sentiment of the KIP. The current semantics of
> >> `KafkaStreams#close(timeout)` are not well defined. Also the general
> >> client inconsistencies are annoying.
> >>
> >>
> >>> This KIP make any change on public interfaces; however, it makes a
> >> subtle change to the existing API's semantics. If this KIP is accepted,
> >> documenting these semantics with as much detail as possible may much
> better.
> >>
> >> I am not sure if I would call this change "subtle". It might actually be
> >> rather big impact and hence I am also wondering about backward
> >> compatibility (details below). Overall, I am not sure if documenting the
> >> change would be sufficient.
> >>
> >>
> >>
> >>> Change the default close timeout of Producer, AdminClient into more
> >> reasonable one, not Long.MAX_VALUE.
> >>
> >> Can you be more specific than "more reasonable", and propose a concrete
> >> value? What about backward compatibility? Assume an application wants to
> >> block forever by default: with this change, it's required to rewrite
> >> code to keep the intended semantics. Hence, the change does not seems to
> >> be backward compatible. Also note, that a config change would not be
> >> sufficient, but an actual code change would be required.
> >>
> >> Also, why not go the other direction and default `KafkaConsumer#close()`
> >> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
> >> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
> >> similar backward compatibility concern raises.
> >>
> >> Making close() blocking by default seems not un-reasonable per-se. Can
> >> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
> >>
> >>
> >>> If succeeded, simply return; if not, close remaining resources with
> >> default close timeout.
> >>
> >> Why do you want to apply the default timeout as fallback? This would
> >> violate the user intention, too, and thus, might result in a situation
> >> that is not much better than the current one.
> >>
> >>
> >> For KafkaStreams, if there are multiple StreamThreads, would the full
> >> timeout be passed into each thread as all of them could shut down in
> >> parallel? Or would the timeout be divided over all threads?
> >>
> >> What about the case when there is a different number of client? For
> >> example, with EOS enabled, there are multiple Producers that need to be
> >> closed, however, the user might not even be aware of the increased
> >> number of producers (or not know how many there actually are).
> >>
> >>
> >> It seems to be hard for users to reason about those dependencies.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/23/19 6:13 PM, Dongjin Lee wrote:
> >>> Hi dev,
> >>>
> >>> I would like to start the discussion of KIP-459: Improve
> >> KafkaStreams#close
> >>> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
> >>> .
> >>> This proposal is originated from the issue reported via community
> slack,
> >>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In
> short,
> >>> this KIP proposes to resolve this problem by improving existing API's
> >>> semantics, not adding any public API changes.
> >>>
> >>> Please have a look when you are free. All opinions will be highly
> >>> appreciated.
> >>>
> >>> Thanks,
> >>> Dongjin
> >>>
> >>
> >>
> >
>
>

-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*

Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for digging into the back ground.

I think it would be good to get feedback from people who work on
clients, too.


-Matthias


On 5/19/19 12:58 PM, Dongjin Lee wrote:
> Hi Matthias,
> 
> I investigated the inconsistencies between `close` semantics of `Producer`,
> `Consumer`, and `AdminClient`. And I found that this inconsistency was
> intended. Here are the details:
> 
> The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
> introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are
> two differences between `Consumer` and `Producer`;
> 
> 1. `Consumer`s don't have large requests.
> 2. `Consumer#close` is affected by consumer coordinator, whose close
> operation is affected by `request.timeout.ms`.
> 
> By the above reasons, Consumer's default timeout was set a little bit
> different.[^3] (It is done by Rajini.)
> 
> At the initial proposal, I proposed to change the default timeout value of
> `[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
> However, since it is now clear that the current implementation is totally
> reasonable, *it seems like changing the approach into just providing a
> close timeout into the clients used by KafkaStreams is a more suitable
> one.*[^4]
> This approach has the following advantages:
> 
> 1. The problem described in KAFKA-7996 now resolved, since Producer doesn't
> hang up while its `close` operation.
> 2. We don't have to change the semantics of `Producer#close`,
> `AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
> kinds of changes are hard for users to reason about.
> 
> How do you think?
> 
> Thanks,
> Dongjin
> 
> [^1]:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
> [^2]: "The existing close() method without a timeout will attempt to close
> the consumer gracefully with a default timeout of 30 seconds. This is
> different from the producer default of Long.MAX_VALUE since consumers don't
> have large requests."
> [^3]: 'Rejected Alternatives' section explains it.
> [^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout
> is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files
> 
> On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for the KIP.
>>
>> Overall, I agree with the sentiment of the KIP. The current semantics of
>> `KafkaStreams#close(timeout)` are not well defined. Also the general
>> client inconsistencies are annoying.
>>
>>
>>> This KIP make any change on public interfaces; however, it makes a
>> subtle change to the existing API's semantics. If this KIP is accepted,
>> documenting these semantics with as much detail as possible may much better.
>>
>> I am not sure if I would call this change "subtle". It might actually be
>> rather big impact and hence I am also wondering about backward
>> compatibility (details below). Overall, I am not sure if documenting the
>> change would be sufficient.
>>
>>
>>
>>> Change the default close timeout of Producer, AdminClient into more
>> reasonable one, not Long.MAX_VALUE.
>>
>> Can you be more specific than "more reasonable", and propose a concrete
>> value? What about backward compatibility? Assume an application wants to
>> block forever by default: with this change, it's required to rewrite
>> code to keep the intended semantics. Hence, the change does not seems to
>> be backward compatible. Also note, that a config change would not be
>> sufficient, but an actual code change would be required.
>>
>> Also, why not go the other direction and default `KafkaConsumer#close()`
>> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
>> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
>> similar backward compatibility concern raises.
>>
>> Making close() blocking by default seems not un-reasonable per-se. Can
>> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>>
>>
>>> If succeeded, simply return; if not, close remaining resources with
>> default close timeout.
>>
>> Why do you want to apply the default timeout as fallback? This would
>> violate the user intention, too, and thus, might result in a situation
>> that is not much better than the current one.
>>
>>
>> For KafkaStreams, if there are multiple StreamThreads, would the full
>> timeout be passed into each thread as all of them could shut down in
>> parallel? Or would the timeout be divided over all threads?
>>
>> What about the case when there is a different number of client? For
>> example, with EOS enabled, there are multiple Producers that need to be
>> closed, however, the user might not even be aware of the increased
>> number of producers (or not know how many there actually are).
>>
>>
>> It seems to be hard for users to reason about those dependencies.
>>
>>
>> -Matthias
>>
>>
>> On 4/23/19 6:13 PM, Dongjin Lee wrote:
>>> Hi dev,
>>>
>>> I would like to start the discussion of KIP-459: Improve
>> KafkaStreams#close
>>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
>>> .
>>> This proposal is originated from the issue reported via community slack,
>>> KAFKA-7996 <https://issues.apache.org/jira/browse/KAFKA-7996>. In short,
>>> this KIP proposes to resolve this problem by improving existing API's
>>> semantics, not adding any public API changes.
>>>
>>> Please have a look when you are free. All opinions will be highly
>>> appreciated.
>>>
>>> Thanks,
>>> Dongjin
>>>
>>
>>
>