You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2019/06/12 20:28:58 UTC

Re: KIP-457: Add DISCONNECTED state to Kafka Streams

Hi Richard,

Sorry for getting late on this, I've finally get some time to take a look
at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
Here are some thoughts:

1. The main motivation of this KIP is to be able to distinguish the case
where

a. "Streams client is in an unhealthy situation and hence cannot proceed"
(which we have an ERROR state) and
b. "Streams client is perfectly healthy, but it cannot get to the target
brokers and hence cannot proceed", and this should also be distinguishable
from
c. "both Streams and brokers are healthy, there's just no data available
for processing and hence cannot proceed").

And we want to have a way to notify the users about the second case b)
distinguished from the others .

2. Following this, when I first thought about the solution I was thinking
about adding a new state in the FSM of Kafka Streams, but after reviewing
the code and the KIP, I felt this may be an overkill to complicate the FSM.
Now I'm wondering if we can achieve the same thing with a single metric.
For example:

2.a) we know that in Streams we always rely on consumer membership to
allocate partitions to instances, which means that the heartbeat thread has
to be working if the consumer wants to ever receive some data, what we can
do is to let users monitor on this metric directly, e.g. if the
heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
are in case b) above.

2.b) if we want to provide a streams-level metric out-of-the-box rather
than letting users to monitor on consumer metrics, another idea is to
leverage on existing "public Set<TopicPartition> assignment()" of
KafkaConsumer, and record the time when it returns empty, meaning that
nothing was assigned. And expose this as a boolean metric indicating
nothing was assigned and hence we are likely in case b) above --- note this
could also mean that we have fewer partitions than necessary so that some
instance does not have any assignment indeed, which is not the same as b),
but I feel consolidating these to cases with a single metric seem also fine.



Guozhang




On Wed, Apr 17, 2019 at 2:30 PM Richard Yu <yo...@gmail.com>
wrote:

> Alright, so I made a few changes to the KIP.
> I realized that there might be an easier way to give the user information
> on the connection state of Kafka Streams.
> In implementation, if one wishes to have DISCONNECTED as a state, then one
> would have to factor in proper state transitions.
> The other approach that is now outlined in the KIP. Instead, we could just
> add a method which I think achieves the same effect.
> If any of you thinks there is wrong with this approach, please let me know.
> :)
>
> Cheers,
> Richard
>
> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu <yo...@gmail.com>
> wrote:
>
> > I just realized something.
> >
> > Hi Matthias, might need your input here.
> > I realized that when implementing this change, as noted in the JIRA, we
> > would need to "check the behaviour of the consumer" since its consumer's
> > connection with broker that we are dealing with.
> >
> > So doesn't that mean we would also be dealing with consumer API changes
> as
> > well?
> > I don't think consumer has any methods which would give us the state of a
> > connection either.
> >
> > - Richard
> >
> > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> >> Hi Micheal,
> >>
> >> Yeah, those are some points I should've clarified.
> >> No problem. Have got it done.
> >>
> >>
> >>
> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll <mi...@confluent.io>
> >> wrote:
> >>
> >>> Richard,
> >>>
> >>> thanks for looking into this!
> >>>
> >>> However, I have some concerns. The KIP you created (
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> )
> >>> doesn't yet address open questions such as the ones mentioned by
> >>> Matthias:
> >>>
> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
> >>> This
> >>> should be defined in the KIP.
> >>>
> >>> 2) Difference between your KIP and the JIRA (
> >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
> >>> the
> >>> DISCONNECTED state was proposed for the scenario that the KStreams
> >>> application is healthy but the Kafka broker is down. This is different
> to
> >>> what you wrote in the KIP: "When something happens in Kafka Streams,
> such
> >>> as an unexpected crash or error, KafkaStreams#state() will return
> >>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be
> the
> >>> state when the KStreams app is down.
> >>>
> >>> I wouldn't expect a KIP vote to pass if these basic questions aren't
> >>> properly sorted out in the KIP.
> >>>
> >>> Best,
> >>> Michael
> >>>
> >>>
> >>>
> >>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu <yohan.richard.yu@gmail.com
> >
> >>> wrote:
> >>>
> >>> > Hi all,
> >>> >
> >>> > Considering that this is a simple KIP, I would probably start the
> >>> voting
> >>> > tomorrow.
> >>> > I think it would be good if we could get this in fast.
> >>> >
> >>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu <
> yohan.richard.yu@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Oh, I probably misunderstood the difference between DISCONNECTED
> and
> >>> > DEAD.
> >>> > > I will update the KIP accordingly.
> >>> > > Thanks for pointing that out!
> >>> > >
> >>> > >
> >>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax <
> >>> matthias@confluent.io>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the initiative.
> >>> > >>
> >>> > >> In the motivation you mention that you want to use DISCONNECT to
> >>> > >> indicate that the application was killed.
> >>> > >>
> >>> > >> What is the difference to existing state DEAD?
> >>> > >>
> >>> > >> Also, the backing JIRA seems to have a different motivation to
> add a
> >>> > >> DISCONNECT state. There, the Kafka Streams application itself is
> >>> > >> healthy, but it cannot connect to the brokers. It seems reasonable
> >>> to
> >>> > >> add a DISCONNECT for this case though.
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> -Matthias
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On 4/16/19 9:30 AM, Richard Yu wrote:
> >>> > >> > Hi all,
> >>> > >> >
> >>> > >> > I like to propose a small KIP on adding a new state to
> >>> > >> KafkaStreams#state().
> >>> > >> > It is very simple, so this should pass relatively quickly!
> >>> > >> > Here is the discussion link:
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> > >> >
> >>> > >> > Cheers,
> >>> > >> > Richard
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> >
> >>>
> >>
>


-- 
-- Guozhang

Re: KIP-457: Add DISCONNECTED state to Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias


On 6/27/19 4:37 PM, Richard Yu wrote:
>  Hi Matthias and Hachikuji,
> Sorry for getting back to you so late. Currently on a trip, so I hadn't got the time to respond.
> Currently, I'm not sure which approach we should do ATM, considering that Guozhang posed multiple possibilities in the previous email.Do you have any preferences as to which approach we should take?
> It would greatly help in the implementation of the issue.
> Cheers,Richard
>     On Thursday, June 13, 2019, 4:55:29 PM GMT+8, Richard Yu <yr...@yahoo.com.INVALID> wrote:  
>  
>   Hi Guozhang,
> Thanks for the input! Then I guess from the approach you have listed above, no API changes will be needed in Kafka consumer then. That will greatly simplify things, although when implementing these approaches, there might be some unexpected issues which might show up.
> Cheers,Richard
>     On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang <wa...@gmail.com> wrote:  
>  
>  Hi Richard,
> 
> Sorry for getting late on this, I've finally get some time to take a look
> at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
> Here are some thoughts:
> 
> 1. The main motivation of this KIP is to be able to distinguish the case
> where
> 
> a. "Streams client is in an unhealthy situation and hence cannot proceed"
> (which we have an ERROR state) and
> b. "Streams client is perfectly healthy, but it cannot get to the target
> brokers and hence cannot proceed", and this should also be distinguishable
> from
> c. "both Streams and brokers are healthy, there's just no data available
> for processing and hence cannot proceed").
> 
> And we want to have a way to notify the users about the second case b)
> distinguished from the others .
> 
> 2. Following this, when I first thought about the solution I was thinking
> about adding a new state in the FSM of Kafka Streams, but after reviewing
> the code and the KIP, I felt this may be an overkill to complicate the FSM.
> Now I'm wondering if we can achieve the same thing with a single metric.
> For example:
> 
> 2.a) we know that in Streams we always rely on consumer membership to
> allocate partitions to instances, which means that the heartbeat thread has
> to be working if the consumer wants to ever receive some data, what we can
> do is to let users monitor on this metric directly, e.g. if the
> heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
> are in case b) above.
> 
> 2.b) if we want to provide a streams-level metric out-of-the-box rather
> than letting users to monitor on consumer metrics, another idea is to
> leverage on existing "public Set<TopicPartition> assignment()" of
> KafkaConsumer, and record the time when it returns empty, meaning that
> nothing was assigned. And expose this as a boolean metric indicating
> nothing was assigned and hence we are likely in case b) above --- note this
> could also mean that we have fewer partitions than necessary so that some
> instance does not have any assignment indeed, which is not the same as b),
> but I feel consolidating these to cases with a single metric seem also fine.
> 
> 
> 
> Guozhang
> 
> 
> 
> 
> On Wed, Apr 17, 2019 at 2:30 PM Richard Yu <yo...@gmail.com>
> wrote:
> 
>> Alright, so I made a few changes to the KIP.
>> I realized that there might be an easier way to give the user information
>> on the connection state of Kafka Streams.
>> In implementation, if one wishes to have DISCONNECTED as a state, then one
>> would have to factor in proper state transitions.
>> The other approach that is now outlined in the KIP. Instead, we could just
>> add a method which I think achieves the same effect.
>> If any of you thinks there is wrong with this approach, please let me know.
>> :)
>>
>> Cheers,
>> Richard
>>
>> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu <yo...@gmail.com>
>> wrote:
>>
>>> I just realized something.
>>>
>>> Hi Matthias, might need your input here.
>>> I realized that when implementing this change, as noted in the JIRA, we
>>> would need to "check the behaviour of the consumer" since its consumer's
>>> connection with broker that we are dealing with.
>>>
>>> So doesn't that mean we would also be dealing with consumer API changes
>> as
>>> well?
>>> I don't think consumer has any methods which would give us the state of a
>>> connection either.
>>>
>>> - Richard
>>>
>>> On Wed, Apr 17, 2019 at 8:43 AM Richard Yu <yo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Micheal,
>>>>
>>>> Yeah, those are some points I should've clarified.
>>>> No problem. Have got it done.
>>>>
>>>>
>>>>
>>>> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll <mi...@confluent.io>
>>>> wrote:
>>>>
>>>>> Richard,
>>>>>
>>>>> thanks for looking into this!
>>>>>
>>>>> However, I have some concerns. The KIP you created (
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
>>>>> )
>>>>> doesn't yet address open questions such as the ones mentioned by
>>>>> Matthias:
>>>>>
>>>>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
>>>>> This
>>>>> should be defined in the KIP.
>>>>>
>>>>> 2) Difference between your KIP and the JIRA (
>>>>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
>>>>> the
>>>>> DISCONNECTED state was proposed for the scenario that the KStreams
>>>>> application is healthy but the Kafka broker is down. This is different
>> to
>>>>> what you wrote in the KIP: "When something happens in Kafka Streams,
>> such
>>>>> as an unexpected crash or error, KafkaStreams#state() will return
>>>>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be
>> the
>>>>> state when the KStreams app is down.
>>>>>
>>>>> I wouldn't expect a KIP vote to pass if these basic questions aren't
>>>>> properly sorted out in the KIP.
>>>>>
>>>>> Best,
>>>>> Michael
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu <yohan.richard.yu@gmail.com
>>>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Considering that this is a simple KIP, I would probably start the
>>>>> voting
>>>>>> tomorrow.
>>>>>> I think it would be good if we could get this in fast.
>>>>>>
>>>>>> On Tue, Apr 16, 2019 at 3:31 PM Richard Yu <
>> yohan.richard.yu@gmail.com
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Oh, I probably misunderstood the difference between DISCONNECTED
>> and
>>>>>> DEAD.
>>>>>>> I will update the KIP accordingly.
>>>>>>> Thanks for pointing that out!
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax <
>>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the initiative.
>>>>>>>>
>>>>>>>> In the motivation you mention that you want to use DISCONNECT to
>>>>>>>> indicate that the application was killed.
>>>>>>>>
>>>>>>>> What is the difference to existing state DEAD?
>>>>>>>>
>>>>>>>> Also, the backing JIRA seems to have a different motivation to
>> add a
>>>>>>>> DISCONNECT state. There, the Kafka Streams application itself is
>>>>>>>> healthy, but it cannot connect to the brokers. It seems reasonable
>>>>> to
>>>>>>>> add a DISCONNECT for this case though.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/16/19 9:30 AM, Richard Yu wrote:
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I like to propose a small KIP on adding a new state to
>>>>>>>> KafkaStreams#state().
>>>>>>>>> It is very simple, so this should pass relatively quickly!
>>>>>>>>> Here is the discussion link:
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Richard
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>
> 
> 


Re: KIP-457: Add DISCONNECTED state to Kafka Streams

Posted by Richard Yu <yr...@yahoo.com.INVALID>.
 Hi Matthias and Hachikuji,
Sorry for getting back to you so late. Currently on a trip, so I hadn't got the time to respond.
Currently, I'm not sure which approach we should do ATM, considering that Guozhang posed multiple possibilities in the previous email.Do you have any preferences as to which approach we should take?
It would greatly help in the implementation of the issue.
Cheers,Richard
    On Thursday, June 13, 2019, 4:55:29 PM GMT+8, Richard Yu <yr...@yahoo.com.INVALID> wrote:  
 
  Hi Guozhang,
Thanks for the input! Then I guess from the approach you have listed above, no API changes will be needed in Kafka consumer then. That will greatly simplify things, although when implementing these approaches, there might be some unexpected issues which might show up.
Cheers,Richard
    On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang <wa...@gmail.com> wrote:  
 
 Hi Richard,

Sorry for getting late on this, I've finally get some time to take a look
at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
Here are some thoughts:

1. The main motivation of this KIP is to be able to distinguish the case
where

a. "Streams client is in an unhealthy situation and hence cannot proceed"
(which we have an ERROR state) and
b. "Streams client is perfectly healthy, but it cannot get to the target
brokers and hence cannot proceed", and this should also be distinguishable
from
c. "both Streams and brokers are healthy, there's just no data available
for processing and hence cannot proceed").

And we want to have a way to notify the users about the second case b)
distinguished from the others .

2. Following this, when I first thought about the solution I was thinking
about adding a new state in the FSM of Kafka Streams, but after reviewing
the code and the KIP, I felt this may be an overkill to complicate the FSM.
Now I'm wondering if we can achieve the same thing with a single metric.
For example:

2.a) we know that in Streams we always rely on consumer membership to
allocate partitions to instances, which means that the heartbeat thread has
to be working if the consumer wants to ever receive some data, what we can
do is to let users monitor on this metric directly, e.g. if the
heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
are in case b) above.

2.b) if we want to provide a streams-level metric out-of-the-box rather
than letting users to monitor on consumer metrics, another idea is to
leverage on existing "public Set<TopicPartition> assignment()" of
KafkaConsumer, and record the time when it returns empty, meaning that
nothing was assigned. And expose this as a boolean metric indicating
nothing was assigned and hence we are likely in case b) above --- note this
could also mean that we have fewer partitions than necessary so that some
instance does not have any assignment indeed, which is not the same as b),
but I feel consolidating these to cases with a single metric seem also fine.



Guozhang




On Wed, Apr 17, 2019 at 2:30 PM Richard Yu <yo...@gmail.com>
wrote:

> Alright, so I made a few changes to the KIP.
> I realized that there might be an easier way to give the user information
> on the connection state of Kafka Streams.
> In implementation, if one wishes to have DISCONNECTED as a state, then one
> would have to factor in proper state transitions.
> The other approach that is now outlined in the KIP. Instead, we could just
> add a method which I think achieves the same effect.
> If any of you thinks there is wrong with this approach, please let me know.
> :)
>
> Cheers,
> Richard
>
> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu <yo...@gmail.com>
> wrote:
>
> > I just realized something.
> >
> > Hi Matthias, might need your input here.
> > I realized that when implementing this change, as noted in the JIRA, we
> > would need to "check the behaviour of the consumer" since its consumer's
> > connection with broker that we are dealing with.
> >
> > So doesn't that mean we would also be dealing with consumer API changes
> as
> > well?
> > I don't think consumer has any methods which would give us the state of a
> > connection either.
> >
> > - Richard
> >
> > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> >> Hi Micheal,
> >>
> >> Yeah, those are some points I should've clarified.
> >> No problem. Have got it done.
> >>
> >>
> >>
> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll <mi...@confluent.io>
> >> wrote:
> >>
> >>> Richard,
> >>>
> >>> thanks for looking into this!
> >>>
> >>> However, I have some concerns. The KIP you created (
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> )
> >>> doesn't yet address open questions such as the ones mentioned by
> >>> Matthias:
> >>>
> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
> >>> This
> >>> should be defined in the KIP.
> >>>
> >>> 2) Difference between your KIP and the JIRA (
> >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
> >>> the
> >>> DISCONNECTED state was proposed for the scenario that the KStreams
> >>> application is healthy but the Kafka broker is down. This is different
> to
> >>> what you wrote in the KIP: "When something happens in Kafka Streams,
> such
> >>> as an unexpected crash or error, KafkaStreams#state() will return
> >>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be
> the
> >>> state when the KStreams app is down.
> >>>
> >>> I wouldn't expect a KIP vote to pass if these basic questions aren't
> >>> properly sorted out in the KIP.
> >>>
> >>> Best,
> >>> Michael
> >>>
> >>>
> >>>
> >>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu <yohan.richard.yu@gmail.com
> >
> >>> wrote:
> >>>
> >>> > Hi all,
> >>> >
> >>> > Considering that this is a simple KIP, I would probably start the
> >>> voting
> >>> > tomorrow.
> >>> > I think it would be good if we could get this in fast.
> >>> >
> >>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu <
> yohan.richard.yu@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Oh, I probably misunderstood the difference between DISCONNECTED
> and
> >>> > DEAD.
> >>> > > I will update the KIP accordingly.
> >>> > > Thanks for pointing that out!
> >>> > >
> >>> > >
> >>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax <
> >>> matthias@confluent.io>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the initiative.
> >>> > >>
> >>> > >> In the motivation you mention that you want to use DISCONNECT to
> >>> > >> indicate that the application was killed.
> >>> > >>
> >>> > >> What is the difference to existing state DEAD?
> >>> > >>
> >>> > >> Also, the backing JIRA seems to have a different motivation to
> add a
> >>> > >> DISCONNECT state. There, the Kafka Streams application itself is
> >>> > >> healthy, but it cannot connect to the brokers. It seems reasonable
> >>> to
> >>> > >> add a DISCONNECT for this case though.
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> -Matthias
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On 4/16/19 9:30 AM, Richard Yu wrote:
> >>> > >> > Hi all,
> >>> > >> >
> >>> > >> > I like to propose a small KIP on adding a new state to
> >>> > >> KafkaStreams#state().
> >>> > >> > It is very simple, so this should pass relatively quickly!
> >>> > >> > Here is the discussion link:
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> > >> >
> >>> > >> > Cheers,
> >>> > >> > Richard
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> >
> >>>
> >>
>


-- 
-- Guozhang
    

Re: KIP-457: Add DISCONNECTED state to Kafka Streams

Posted by Richard Yu <yr...@yahoo.com.INVALID>.
 Hi Guozhang,
Thanks for the input! Then I guess from the approach you have listed above, no API changes will be needed in Kafka consumer then. That will greatly simplify things, although when implementing these approaches, there might be some unexpected issues which might show up.
Cheers,Richard
    On Thursday, June 13, 2019, 4:29:29 AM GMT+8, Guozhang Wang <wa...@gmail.com> wrote:  
 
 Hi Richard,

Sorry for getting late on this, I've finally get some time to take a look
at https://github.com/apache/kafka/pull/6594 as well as the KIP itself.
Here are some thoughts:

1. The main motivation of this KIP is to be able to distinguish the case
where

a. "Streams client is in an unhealthy situation and hence cannot proceed"
(which we have an ERROR state) and
b. "Streams client is perfectly healthy, but it cannot get to the target
brokers and hence cannot proceed", and this should also be distinguishable
from
c. "both Streams and brokers are healthy, there's just no data available
for processing and hence cannot proceed").

And we want to have a way to notify the users about the second case b)
distinguished from the others .

2. Following this, when I first thought about the solution I was thinking
about adding a new state in the FSM of Kafka Streams, but after reviewing
the code and the KIP, I felt this may be an overkill to complicate the FSM.
Now I'm wondering if we can achieve the same thing with a single metric.
For example:

2.a) we know that in Streams we always rely on consumer membership to
allocate partitions to instances, which means that the heartbeat thread has
to be working if the consumer wants to ever receive some data, what we can
do is to let users monitor on this metric directly, e.g. if the
heartbeat-rate drops to zero BUT the state is still in RUNNING it means we
are in case b) above.

2.b) if we want to provide a streams-level metric out-of-the-box rather
than letting users to monitor on consumer metrics, another idea is to
leverage on existing "public Set<TopicPartition> assignment()" of
KafkaConsumer, and record the time when it returns empty, meaning that
nothing was assigned. And expose this as a boolean metric indicating
nothing was assigned and hence we are likely in case b) above --- note this
could also mean that we have fewer partitions than necessary so that some
instance does not have any assignment indeed, which is not the same as b),
but I feel consolidating these to cases with a single metric seem also fine.



Guozhang




On Wed, Apr 17, 2019 at 2:30 PM Richard Yu <yo...@gmail.com>
wrote:

> Alright, so I made a few changes to the KIP.
> I realized that there might be an easier way to give the user information
> on the connection state of Kafka Streams.
> In implementation, if one wishes to have DISCONNECTED as a state, then one
> would have to factor in proper state transitions.
> The other approach that is now outlined in the KIP. Instead, we could just
> add a method which I think achieves the same effect.
> If any of you thinks there is wrong with this approach, please let me know.
> :)
>
> Cheers,
> Richard
>
> On Wed, Apr 17, 2019 at 11:49 AM Richard Yu <yo...@gmail.com>
> wrote:
>
> > I just realized something.
> >
> > Hi Matthias, might need your input here.
> > I realized that when implementing this change, as noted in the JIRA, we
> > would need to "check the behaviour of the consumer" since its consumer's
> > connection with broker that we are dealing with.
> >
> > So doesn't that mean we would also be dealing with consumer API changes
> as
> > well?
> > I don't think consumer has any methods which would give us the state of a
> > connection either.
> >
> > - Richard
> >
> > On Wed, Apr 17, 2019 at 8:43 AM Richard Yu <yo...@gmail.com>
> > wrote:
> >
> >> Hi Micheal,
> >>
> >> Yeah, those are some points I should've clarified.
> >> No problem. Have got it done.
> >>
> >>
> >>
> >> On Wed, Apr 17, 2019 at 6:42 AM Michael Noll <mi...@confluent.io>
> >> wrote:
> >>
> >>> Richard,
> >>>
> >>> thanks for looking into this!
> >>>
> >>> However, I have some concerns. The KIP you created (
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> )
> >>> doesn't yet address open questions such as the ones mentioned by
> >>> Matthias:
> >>>
> >>> 1) What is the difference between DEAD and the proposed DISCONNECTED?
> >>> This
> >>> should be defined in the KIP.
> >>>
> >>> 2) Difference between your KIP and the JIRA (
> >>> https://issues.apache.org/jira/browse/KAFKA-6520): In the JIRA ticket,
> >>> the
> >>> DISCONNECTED state was proposed for the scenario that the KStreams
> >>> application is healthy but the Kafka broker is down. This is different
> to
> >>> what you wrote in the KIP: "When something happens in Kafka Streams,
> such
> >>> as an unexpected crash or error, KafkaStreams#state() will return
> >>> State.DISCONNECTED.", which seems to mean that DISCONNECTED should be
> the
> >>> state when the KStreams app is down.
> >>>
> >>> I wouldn't expect a KIP vote to pass if these basic questions aren't
> >>> properly sorted out in the KIP.
> >>>
> >>> Best,
> >>> Michael
> >>>
> >>>
> >>>
> >>> On Wed, Apr 17, 2019 at 3:35 AM Richard Yu <yohan.richard.yu@gmail.com
> >
> >>> wrote:
> >>>
> >>> > Hi all,
> >>> >
> >>> > Considering that this is a simple KIP, I would probably start the
> >>> voting
> >>> > tomorrow.
> >>> > I think it would be good if we could get this in fast.
> >>> >
> >>> > On Tue, Apr 16, 2019 at 3:31 PM Richard Yu <
> yohan.richard.yu@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Oh, I probably misunderstood the difference between DISCONNECTED
> and
> >>> > DEAD.
> >>> > > I will update the KIP accordingly.
> >>> > > Thanks for pointing that out!
> >>> > >
> >>> > >
> >>> > > On Tue, Apr 16, 2019 at 3:13 PM Matthias J. Sax <
> >>> matthias@confluent.io>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the initiative.
> >>> > >>
> >>> > >> In the motivation you mention that you want to use DISCONNECT to
> >>> > >> indicate that the application was killed.
> >>> > >>
> >>> > >> What is the difference to existing state DEAD?
> >>> > >>
> >>> > >> Also, the backing JIRA seems to have a different motivation to
> add a
> >>> > >> DISCONNECT state. There, the Kafka Streams application itself is
> >>> > >> healthy, but it cannot connect to the brokers. It seems reasonable
> >>> to
> >>> > >> add a DISCONNECT for this case though.
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> -Matthias
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On 4/16/19 9:30 AM, Richard Yu wrote:
> >>> > >> > Hi all,
> >>> > >> >
> >>> > >> > I like to propose a small KIP on adding a new state to
> >>> > >> KafkaStreams#state().
> >>> > >> > It is very simple, so this should pass relatively quickly!
> >>> > >> > Here is the discussion link:
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams
> >>> > >> >
> >>> > >> > Cheers,
> >>> > >> > Richard
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> >
> >>>
> >>
>


-- 
-- Guozhang