You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Bruno Cadonna <br...@confluent.io> on 2020/09/01 09:30:31 UTC

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Hi,

I updated the KIP with the feedback so far. I removed the API to close 
the Kafka Streams client asynchronously, since it should be possible to 
avoid the deadlock with the existing method and without a KIP.

Please have a look at the updated KIP and let me know what you think.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads

Best,
Bruno

On 26.08.20 16:31, Bruno Cadonna wrote:
> Hi,
> 
> I would like to propose the following KIP to start and shut down stream 
> threads during execution as well as to shut down asynchronously a Kafka 
> Streams client from an uncaught exception handler.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients 
> 
> 
> Best,
> Bruno

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks! SGTM.

-Matthias

On 9/3/20 3:17 AM, Bruno Cadonna wrote:
> Hi Matthias,
> 
> I replied inline.
> 
> Best,
> Bruno
> 
> On 02.09.20 22:06, Matthias J. Sax wrote:
>> Thanks for updating the KIP.
>>
>> Why do you propose to return `boolean` from addStreamThread() if the
>> thread could not be started? As an alternative, we could also throw an
>> exception if the client is not in state RUNNING? -- I guess both are
>> valid options: just want to see what the pros/cons of each approach
>> would be?
>>
> 
> I prefer to return a boolean because it is nothing exceptional if a
> stream thread cannot be added due to an inappropriate state. State
> changes are expected in Streams. Furthermore, users should not be forced
> to control their program flow by catching exceptions. Let me give you
> some examples for returning a boolean and throwing an exception:
> 
> returning a boolean
> 
> while (!kafkaStreams.addStreamThread() &&
>        kafkaStreams.state() != State.NOT_RUNNING &&
>        kafkaStreams.state() != State.ERROR) {
> }
> 
> 
> throwing an exception
> 
> boolean added = false;
> while (!added &&
>        kafkaStreams.state() != State.NOT_RUNNING &&
>        kafkaStreams.state() != State.ERROR) {
> 
>     try {
>         kafkaStreams.addStreamThread();
>     added = true;
>     } catch (final Exception ex) {
>     // do nothing
>     }
> }
> 
> IMO the first example is more readable than the second.
> 
> 
>> Btw: should we allow to add a new thread if the state is REBALANCING,
>> too? I actually don't see a reason why we should not allow this?
>>
> 
> I guess you are right. I will update the KIP and include REBALANCING.
> 
> 
>> For removeStreamThread(), might it be worth to actually guarantee that
>> the thread with the largest index is stopped instead of leaving if
>> unspecified? It does not seem to be a big burden on the implementation
>> and given that we plan to reused indices of died threads, it might be
>> nice to have a contract? Or would there be any negative impact if we
>> guarantee it?
>>
> 
> I left unspecified which stream thread is removed since I could not find
> any good reason for a guarantee. Also in your comment, I do not see what
> advantage, we would have if we guaranteed that the stream thread with
> the largest index is stopped. It would not guarantee that the next added
> stream thread would get the largest index, because another stream thread
> with a lower index could have failed in the meanwhile and now two
> indices are up for grabs.
> Leaving unspecified which stream thread is removed also gives us the
> possibility to choose the stream thread to remove according to other
> aspects like for example the one with the least local state.
> 
> 
>> Another thought: should we add a parameter `numberOfThreads` to each
>> method to allow users to start/stop multiple threads at once?
>>
> 
> I would keep it simple for now and add overloads if users request them.
> 
> 
>> What happens if there is zero running threads and one calls
>> removeStreamThread()? Should we also add a `boolean` flag and return
>> `false` for this case (or throw an exception)?
>>
> 
> Yeah, I think this is a good idea for the programmatical removal of all
> threads. However, I would not throw an exception for the reasons I
> pointed out above.
> 
> 
>>
>> For the metric name, I would prefer "failed" over "crashed". Thoughts?
>>
> 
> I think I like "failed" more than "crashed" and it is also more
> consistent with other parts of the code like the
> ProductionExceptionHandlerResponse.FAIL.
> 
> 
>>
>> Side remark for the PR: can we make sure to update the description of
>> `num.stream.threads` to explain that it's the _initial_ number of
>> threads on startup?
>>
> 
> Good point!
> 
>>
>> -Matthias
>>
>>
>> On 9/1/20 2:01 PM, Walker Carlson wrote:
>>> Hi Bruno,
>>>
>>> I read through your updated KIP and it looks good to me. I agree with
>>> adding the metric to keep track of crashed streams in replace of a
>>> list of
>>> dead streams.
>>>
>>> best,
>>> Wlaker :)
>>>
>>> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:
>>>
>>>> Hi John,
>>>>
>>>> your proposal makes sense! I will update the KIP.
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On 01.09.20 17:31, John Roesler wrote:
>>>>> Hello Bruno,
>>>>>
>>>>> Thanks for the update! The KIP looks good to me; I only have
>>>>> a grammatical complaint about the proposed metric name.
>>>>>
>>>>> "Died" is a verb, the past tense of "to die", but in the
>>>>> expression,"x stream threads", x should be an adjective. To
>>>>> be fair, "died" is also the past participle of "to die", and
>>>>> participles can usually be used as adjectives. Maybe it
>>>>> sounds wrong to me because there's already a specifically
>>>>> adjectival form: "dead". So "dead-stream-threads" seems more
>>>>> natural.
>>>>>
>>>>> However, I'm not sure if that captures the specific meaning
>>>>> you're shooting for, namely that the metric counts only the
>>>>> threads that died exceptionally, vs. from calling
>>>>> "removeStreamThread()". What do you think of "crashed-
>>>>> stream-threads" instead?
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I updated the KIP with the feedback so far. I removed the API to
>>>>>> close
>>>>>> the Kafka Streams client asynchronously, since it should be
>>>>>> possible to
>>>>>> avoid the deadlock with the existing method and without a KIP.
>>>>>>
>>>>>> Please have a look at the updated KIP and let me know what you think.
>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
>>>>
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>> On 26.08.20 16:31, Bruno Cadonna wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> I would like to propose the following KIP to start and shut down
>>>>>>> stream
>>>>>>> threads during execution as well as to shut down asynchronously a
>>>>>>> Kafka
>>>>>>> Streams client from an uncaught exception handler.
>>>>>>>
>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>>>>
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>
>>>>
>>>
>>


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Matthias,

I replied inline.

Best,
Bruno

On 02.09.20 22:06, Matthias J. Sax wrote:
> Thanks for updating the KIP.
> 
> Why do you propose to return `boolean` from addStreamThread() if the
> thread could not be started? As an alternative, we could also throw an
> exception if the client is not in state RUNNING? -- I guess both are
> valid options: just want to see what the pros/cons of each approach
> would be?
> 

I prefer to return a boolean because it is nothing exceptional if a 
stream thread cannot be added due to an inappropriate state. State 
changes are expected in Streams. Furthermore, users should not be forced 
to control their program flow by catching exceptions. Let me give you 
some examples for returning a boolean and throwing an exception:

returning a boolean

while (!kafkaStreams.addStreamThread() &&
        kafkaStreams.state() != State.NOT_RUNNING &&
        kafkaStreams.state() != State.ERROR) {
}


throwing an exception

boolean added = false;
while (!added &&
        kafkaStreams.state() != State.NOT_RUNNING &&
        kafkaStreams.state() != State.ERROR) {

     try {
         kafkaStreams.addStreamThread();
	added = true;
     } catch (final Exception ex) {
	// do nothing
     }
}

IMO the first example is more readable than the second.


> Btw: should we allow to add a new thread if the state is REBALANCING,
> too? I actually don't see a reason why we should not allow this?
> 

I guess you are right. I will update the KIP and include REBALANCING.


> For removeStreamThread(), might it be worth to actually guarantee that
> the thread with the largest index is stopped instead of leaving if
> unspecified? It does not seem to be a big burden on the implementation
> and given that we plan to reused indices of died threads, it might be
> nice to have a contract? Or would there be any negative impact if we
> guarantee it?
> 

I left unspecified which stream thread is removed since I could not find 
any good reason for a guarantee. Also in your comment, I do not see what 
advantage, we would have if we guaranteed that the stream thread with 
the largest index is stopped. It would not guarantee that the next added 
stream thread would get the largest index, because another stream thread 
with a lower index could have failed in the meanwhile and now two 
indices are up for grabs.
Leaving unspecified which stream thread is removed also gives us the 
possibility to choose the stream thread to remove according to other 
aspects like for example the one with the least local state.


> Another thought: should we add a parameter `numberOfThreads` to each
> method to allow users to start/stop multiple threads at once?
> 

I would keep it simple for now and add overloads if users request them.


> What happens if there is zero running threads and one calls
> removeStreamThread()? Should we also add a `boolean` flag and return
> `false` for this case (or throw an exception)?
> 

Yeah, I think this is a good idea for the programmatical removal of all 
threads. However, I would not throw an exception for the reasons I 
pointed out above.


> 
> For the metric name, I would prefer "failed" over "crashed". Thoughts?
> 

I think I like "failed" more than "crashed" and it is also more 
consistent with other parts of the code like the 
ProductionExceptionHandlerResponse.FAIL.


> 
> Side remark for the PR: can we make sure to update the description of
> `num.stream.threads` to explain that it's the _initial_ number of
> threads on startup?
> 

Good point!

> 
> -Matthias
> 
> 
> On 9/1/20 2:01 PM, Walker Carlson wrote:
>> Hi Bruno,
>>
>> I read through your updated KIP and it looks good to me. I agree with
>> adding the metric to keep track of crashed streams in replace of a list of
>> dead streams.
>>
>> best,
>> Wlaker :)
>>
>> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:
>>
>>> Hi John,
>>>
>>> your proposal makes sense! I will update the KIP.
>>>
>>> Best,
>>> Bruno
>>>
>>> On 01.09.20 17:31, John Roesler wrote:
>>>> Hello Bruno,
>>>>
>>>> Thanks for the update! The KIP looks good to me; I only have
>>>> a grammatical complaint about the proposed metric name.
>>>>
>>>> "Died" is a verb, the past tense of "to die", but in the
>>>> expression,"x stream threads", x should be an adjective. To
>>>> be fair, "died" is also the past participle of "to die", and
>>>> participles can usually be used as adjectives. Maybe it
>>>> sounds wrong to me because there's already a specifically
>>>> adjectival form: "dead". So "dead-stream-threads" seems more
>>>> natural.
>>>>
>>>> However, I'm not sure if that captures the specific meaning
>>>> you're shooting for, namely that the metric counts only the
>>>> threads that died exceptionally, vs. from calling
>>>> "removeStreamThread()". What do you think of "crashed-
>>>> stream-threads" instead?
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
>>>>> Hi,
>>>>>
>>>>> I updated the KIP with the feedback so far. I removed the API to close
>>>>> the Kafka Streams client asynchronously, since it should be possible to
>>>>> avoid the deadlock with the existing method and without a KIP.
>>>>>
>>>>> Please have a look at the updated KIP and let me know what you think.
>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>> On 26.08.20 16:31, Bruno Cadonna wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I would like to propose the following KIP to start and shut down stream
>>>>>> threads during execution as well as to shut down asynchronously a Kafka
>>>>>> Streams client from an uncaught exception handler.
>>>>>>
>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>
>>>
>>
> 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for updating the KIP.

Why do you propose to return `boolean` from addStreamThread() if the
thread could not be started? As an alternative, we could also throw an
exception if the client is not in state RUNNING? -- I guess both are
valid options: just want to see what the pros/cons of each approach
would be?

Btw: should we allow to add a new thread if the state is REBALANCING,
too? I actually don't see a reason why we should not allow this?

For removeStreamThread(), might it be worth to actually guarantee that
the thread with the largest index is stopped instead of leaving if
unspecified? It does not seem to be a big burden on the implementation
and given that we plan to reused indices of died threads, it might be
nice to have a contract? Or would there be any negative impact if we
guarantee it?

Another thought: should we add a parameter `numberOfThreads` to each
method to allow users to start/stop multiple threads at once?

What happens if there is zero running threads and one calls
removeStreamThread()? Should we also add a `boolean` flag and return
`false` for this case (or throw an exception)?


For the metric name, I would prefer "failed" over "crashed". Thoughts?


Side remark for the PR: can we make sure to update the description of
`num.stream.threads` to explain that it's the _initial_ number of
threads on startup?


-Matthias


On 9/1/20 2:01 PM, Walker Carlson wrote:
> Hi Bruno,
> 
> I read through your updated KIP and it looks good to me. I agree with
> adding the metric to keep track of crashed streams in replace of a list of
> dead streams.
> 
> best,
> Wlaker :)
> 
> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi John,
>>
>> your proposal makes sense! I will update the KIP.
>>
>> Best,
>> Bruno
>>
>> On 01.09.20 17:31, John Roesler wrote:
>>> Hello Bruno,
>>>
>>> Thanks for the update! The KIP looks good to me; I only have
>>> a grammatical complaint about the proposed metric name.
>>>
>>> "Died" is a verb, the past tense of "to die", but in the
>>> expression,"x stream threads", x should be an adjective. To
>>> be fair, "died" is also the past participle of "to die", and
>>> participles can usually be used as adjectives. Maybe it
>>> sounds wrong to me because there's already a specifically
>>> adjectival form: "dead". So "dead-stream-threads" seems more
>>> natural.
>>>
>>> However, I'm not sure if that captures the specific meaning
>>> you're shooting for, namely that the metric counts only the
>>> threads that died exceptionally, vs. from calling
>>> "removeStreamThread()". What do you think of "crashed-
>>> stream-threads" instead?
>>>
>>> Thanks,
>>> -John
>>>
>>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
>>>> Hi,
>>>>
>>>> I updated the KIP with the feedback so far. I removed the API to close
>>>> the Kafka Streams client asynchronously, since it should be possible to
>>>> avoid the deadlock with the existing method and without a KIP.
>>>>
>>>> Please have a look at the updated KIP and let me know what you think.
>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On 26.08.20 16:31, Bruno Cadonna wrote:
>>>>> Hi,
>>>>>
>>>>> I would like to propose the following KIP to start and shut down stream
>>>>> threads during execution as well as to shut down asynchronously a Kafka
>>>>> Streams client from an uncaught exception handler.
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>>>>>
>>>>>
>>>>> Best,
>>>>> Bruno
>>>
>>
> 


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by Walker Carlson <wc...@confluent.io>.
Hi Bruno,

I read through your updated KIP and it looks good to me. I agree with
adding the metric to keep track of crashed streams in replace of a list of
dead streams.

best,
Wlaker :)

On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:

> Hi John,
>
> your proposal makes sense! I will update the KIP.
>
> Best,
> Bruno
>
> On 01.09.20 17:31, John Roesler wrote:
> > Hello Bruno,
> >
> > Thanks for the update! The KIP looks good to me; I only have
> > a grammatical complaint about the proposed metric name.
> >
> > "Died" is a verb, the past tense of "to die", but in the
> > expression,"x stream threads", x should be an adjective. To
> > be fair, "died" is also the past participle of "to die", and
> > participles can usually be used as adjectives. Maybe it
> > sounds wrong to me because there's already a specifically
> > adjectival form: "dead". So "dead-stream-threads" seems more
> > natural.
> >
> > However, I'm not sure if that captures the specific meaning
> > you're shooting for, namely that the metric counts only the
> > threads that died exceptionally, vs. from calling
> > "removeStreamThread()". What do you think of "crashed-
> > stream-threads" instead?
> >
> > Thanks,
> > -John
> >
> > On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
> >> Hi,
> >>
> >> I updated the KIP with the feedback so far. I removed the API to close
> >> the Kafka Streams client asynchronously, since it should be possible to
> >> avoid the deadlock with the existing method and without a KIP.
> >>
> >> Please have a look at the updated KIP and let me know what you think.
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> >>
> >> Best,
> >> Bruno
> >>
> >> On 26.08.20 16:31, Bruno Cadonna wrote:
> >>> Hi,
> >>>
> >>> I would like to propose the following KIP to start and shut down stream
> >>> threads during execution as well as to shut down asynchronously a Kafka
> >>> Streams client from an uncaught exception handler.
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
> >>>
> >>>
> >>> Best,
> >>> Bruno
> >
>

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by Bruno Cadonna <br...@confluent.io>.
Hi John,

your proposal makes sense! I will update the KIP.

Best,
Bruno

On 01.09.20 17:31, John Roesler wrote:
> Hello Bruno,
> 
> Thanks for the update! The KIP looks good to me; I only have
> a grammatical complaint about the proposed metric name.
> 
> "Died" is a verb, the past tense of "to die", but in the
> expression,"x stream threads", x should be an adjective. To
> be fair, "died" is also the past participle of "to die", and
> participles can usually be used as adjectives. Maybe it
> sounds wrong to me because there's already a specifically
> adjectival form: "dead". So "dead-stream-threads" seems more
> natural.
> 
> However, I'm not sure if that captures the specific meaning
> you're shooting for, namely that the metric counts only the
> threads that died exceptionally, vs. from calling
> "removeStreamThread()". What do you think of "crashed-
> stream-threads" instead?
> 
> Thanks,
> -John
> 
> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
>> Hi,
>>
>> I updated the KIP with the feedback so far. I removed the API to close
>> the Kafka Streams client asynchronously, since it should be possible to
>> avoid the deadlock with the existing method and without a KIP.
>>
>> Please have a look at the updated KIP and let me know what you think.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
>>
>> Best,
>> Bruno
>>
>> On 26.08.20 16:31, Bruno Cadonna wrote:
>>> Hi,
>>>
>>> I would like to propose the following KIP to start and shut down stream
>>> threads during execution as well as to shut down asynchronously a Kafka
>>> Streams client from an uncaught exception handler.
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
>>>
>>>
>>> Best,
>>> Bruno
> 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

Posted by John Roesler <vv...@apache.org>.
Hello Bruno,

Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.

"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.

However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?

Thanks,
-John

On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
> Hi,
> 
> I updated the KIP with the feedback so far. I removed the API to close 
> the Kafka Streams client asynchronously, since it should be possible to 
> avoid the deadlock with the existing method and without a KIP.
> 
> Please have a look at the updated KIP and let me know what you think.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> 
> Best,
> Bruno
> 
> On 26.08.20 16:31, Bruno Cadonna wrote:
> > Hi,
> > 
> > I would like to propose the following KIP to start and shut down stream 
> > threads during execution as well as to shut down asynchronously a Kafka 
> > Streams client from an uncaught exception handler.
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients 
> > 
> > 
> > Best,
> > Bruno