You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <mj...@apache.org> on 2020/02/22 06:51:26 UTC

[DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Hi,

I would like to propose KIP-572 to make Kafka Streams more robust with
regard to timeout exception handling.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
eouts+and+retries+in+Kafka+Streams

Note, that there is a long list of rejected alternatives that can be
used as starting point for the discussion. In fact, I am not sure if
one of those listed alternative might be better than the current
proposal -- I just had to pick one design for now (the reason why I
picked the current design is that it's semantically fully backward
compatible and does not introduce any new configs).

Looking forward to your feedback.

- -Matthias
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Qz2kACgkQO4miYXKq
/Oj6mw/9E/AZlhMZRb1WKPENxeacXNLtlzamJZira9tcbQVGZ6/PBldFrx/T0/rG
HooPuyb4m3mFPB1JJ5lc5VujkIVGbet5Xq6MHishJ1LEKgVKtXLWlhp6RMZAfNCK
hzzwVV5Ddkc7ooKMAlIzb16Yfxr99YVl9umMO/rroPp7RWgIVM5jHIWXH7sGUDSA
qElyuIdUkDXq0QzITt65QWHeWfy59RbLSetvDZmgaZ8IT20IBur80LSrNlfLfHk6
XxjtPUm0OEplp8mrVYw4mGR+SX2aMjEjZ9PUpSV8hHoQjf6jF5TmZJPOd+Gv3b8v
WtqTFHRvXaz5gdGBmR5evj60OOETwZcqspJ+PGNRQmu9MO/fJ6iMPiz5FK7I34om
43dwnKvmUdJakFkcsF7rHzuU5zp9txlnyCTQGqB6U34cC3RuUPNUEKDjFXSLXTXd
XgDagg+TK8sa3v+zFrk6Y/gbX4jGEBf/DOzxt980Pu5ahGznefGbAuVZ6SDAIhm5
3NHiHGXRIhbp++gknPOq8UB1/eoshk6iL7+L/W1m2nnmvl/HvJIy0+w/5Mv9VvPF
01NVryC6jE2u6eE0SLDHA/dBaQ6TY0nk/1fIadJTmgfhUXUFC16JPmrUuBMkd+fN
QuTXHZJKS/brcg+DL+L01nd5nKn6jKH+OB+VxFQJuVCdSo4bKzg=
=53Xz
-----END PGP SIGNATURE-----

Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks for the feedback Bruno. I updated the KIP (I thought the info was
there, but it seems it was too implicit).


1) Each time a client TimeoutException happens, we would log a WARN
message. If `task.timeout.ms` expires, we would rethrow the last client
`TimeoutException` to stop processing.


2) Yes, this first client `TimeoutException` will start the timer, and a
successful retry would reset/disable it.


-Matthias

On 5/14/20 9:19 AM, John Roesler wrote:
> Thanks for the update, Matthias!
> 
> Other than Bruno’s good points, this proposal looks good to me. 
> 
> Thanks,
> John
> 
> On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
>> Hi Matthias,
>>
>> Thank you for the KIP. I like your KIP.
>>
>> Here my feedback:
>>
>> 1. The KIP is not clear about what should happen when task.timeout.ms
>> expires. To facilitate the mapping from the error users might
>> encounter due to timeouts to this KIP, it would be good to state the
>> error that will be thrown when task.timeout.ms expires.
>>
>> 2. The KIP does also not clearly state how task.timeout.ms is
>> measured. Does the time start with the first timeout exception and
>> then run until either the timeout expires or the task receives a
>> successful reply? Or is it started each time the task is processed by
>> the stream thread and stopped when its turn is over and when the sum
>> of the single times without a successful reply reaches the timeout an
>> error is thrown?
>>
>> Best,
>> Bruno
>>
>> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>> John, Guozhang,
>>>
>>> thanks a lot for your feedback. I updated the KIP on a slightly
>>> different angle: instead of using retries, we should switch to a timeout
>>> based approach. I also extended the KIP to deprecate producer/admin
>>> `retries` config.
>>>
>>> I like the proposal to skip a task if a client TimeoutException occurs
>>> and just retry it later; update the KIP accordingly. However, I would
>>> not retry forever by default. In general, all points you raised are
>>> valid and the question is just what _default_ do we want to have. Given
>>> the issue that tasks might get "out-of-sync" regarding their event-time
>>> progress and that inexperience users might not do proper monitoring, I
>>> prefer to have a "reasonable" default timeout if a task does not make
>>> progress at all and fail for this case.
>>>
>>> I would also argue (following Guozhang) that we don't necessarily need
>>> new metrics. Monitoring the number of alive threads (recently added),
>>> consumer lag, processing rate etc should give an operator enough insight
>>> into the application. I don't see the need atm to add some specify "task
>>> timeout" metrics.
>>>
>>> For the issue of cascading failures, I would want to exclude it from
>>> this KIP to keep it focused.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/27/20 1:31 PM, Guozhang Wang wrote:
>>>> Hello John,
>>>>
>>>> I'll make note that you owe me a beer now :)
>>>>
>>>> I think I'm leaning towards your approach as well based on my observations
>>>> on previously reported timeout exceptions in the past. I once left some
>>>> thoughts on Matthias' PR here
>>>> https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
>>> and I
>>>> think I can better summarize my thoughts in this thread:
>>>>
>>>> 1) First of all, we need to think from user's perspective, what they'd
>>>> really want to be notified:
>>>>
>>>> a. "If my application cannot make progress temporarily due to various
>>>> transient issues (John had listed several examples already), just handle
>>>> that internally and I do not wanna be notified and worry about how to tune
>>>> my timeout configs at all".
>>>> b. "If my application cannot make progress for a long time, which is
>>> likely
>>>> due to a bad config, a human error, network issues, etc such that I should
>>>> be involved in the loop of trouble shooting, let me know sooner than
>>> later".
>>>>
>>>> and what they'd not preferred but may happen today:
>>>>
>>>> c. "one transient error cause a thread to die, but then after tasks
>>>> migrated everything goes to normal; so the application silently lost a
>>>> thread without letting me know"; in fact, in such cases even a cascading
>>>> exception that eventually kills all thread may be better since at
>>> least the
>>>> users would be notified.
>>>>
>>>> Based on that, instead of retrying immediately at the granularity each
>>>> blocking call, it should be sufficient to only consider the handling logic
>>>> at the thread level. That is, within an iteration of the thread, it would
>>>> try to:
>>>>
>>>> * initialized some created tasks;
>>>> * restored some restoring tasks;
>>>> * processed some running tasks who have buffered records that are
>>>> processable;
>>>> * committed some tasks.
>>>>
>>>> In each of these steps, we may need to make some blocking calls in the
>>>> underlying embedded clients, and if either of them timed out, we would not
>>>> be able to make progress partially or not being able to make any progress
>>>> at all. If we still want to set a configured value of "retries", I think a
>>>> better idea would be to say "if we cannot make progress for consecutive N
>>>> iterations of a thread, the user should be notified".
>>>>
>>>> ---------------
>>>>
>>>> 2) Second, let's consider what's a good way to notify the user. Today our
>>>> way of notification is simple: throw the exception all the way up to
>>> user's
>>>> uncaught-exception-handler (if it's registered) and let the thread
>>> die. I'm
>>>> wondering if we could instead educate users to watch on some key metrics
>>>> for "progress indicate" than relying on the exception handler though. Some
>>>> candidates in mind:
>>>>
>>>> * consumer-lag: this is for both source topics and for repartition topics,
>>>> it indicates if one or more of the tasks within each sub-topology is
>>>> lagging or not; in the case where *some or all* of the threads cannot make
>>>> progress. E.g. if a downstream task's thread is blocked somehow while its
>>>> upstream task's thread is not, then the consumer-lag on the repartition
>>>> topic would keep growing.
>>>>
>>>> * *idle* state: this is an idea we discussed in
>>>> https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
>>>> instance-level new state, if all threads of the instance cannot make
>>>> progress (primarily for the reason that it cannot talk to the brokers).
>>>>
>>>> * process-rate: this is at thread-level. However if some tasks cannot make
>>>> progress while others can still make progress within a thread, its
>>>> process-rate would now drop to zero and it's a bit hard to indicate
>>>> compared with comsumer-lag.
>>>>
>>>> If we feel that relying on metrics is better than throwing the exception
>>>> and let the thread die, then we would not need to have the "retry" config
>>>> as well.
>>>>
>>>> ---------------
>>>>
>>>> 3) This maybe semi-related to the timeout itself, but as I mentioned today
>>>> one common issue we would need to resolve is to lose a thread BUT not
>>>> losing the whole instance. In other words, we should consider when we have
>>>> to throw an exception from a thread (not due to timeouts, but say due to
>>>> some fatal error), should we just kill the corresponding thread or should
>>>> we be more brutal and just kill the whole instance instead. I'm happy to
>>>> defer this to another discussion thread but just bring this up here.
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vv...@apache.org> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Thanks for the proposal! I think this will be a wonderful improvement
>>>>> to Streams. In particular, thanks for the motivation. It would indeed
>>>>> be nice not to have to set long timeout configs and block individual
>>>>> client requests in order to cope with transient slow responses.
>>>>>
>>>>> I'm very well aware that this might make me sound like a crazy person,
>>>>> but one alternative I'd like to consider is not bounding the retries at
>>>>> all.
>>>>> Instead, Streams would just skip over timed-out tasks and try again
>>>>> on the next iteration, as you proposed, but would continue to do so
>>>>> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
>>>>> further
>>>>> propose to log a warning every time a task times out and also maintain
>>>>> a new metric indicating task timeouts.
>>>>>
>>>>> To see why this might be attractive, let me pose a hypothetical
>>>>> installation
>>>>> which has thousands of Streams instances, maybe as part of hundreds of
>>>>> applications belonging to dozens of teams. Let's also assume there is a
>>>>> single broker cluster serving all these instances. Such an
>>> environment has
>>>>> a number of transient failure modes:
>>>>> * A single broker instance may become slow to respond due to hardware
>>>>> failures (e.g., a bad NIC) or other environmental causes (CPU competition
>>>>> with co-resident processes, long JVM GC pauses, etc.). Single-broker
>>>>> unavailability could cause some tasks to time out while others can
>>> proceed
>>>>> in an individual Streams instance.
>>>>> * The entire broker cluster could become temporarily unavailable
>>> (consider:
>>>>> a faulty firewall configuration gets deployed, severing all Streams
>>>>> instances
>>>>> from the brokers).
>>>>> * A faulty security configuration may temporarily sever whole application
>>>>> from
>>>>> the brokers.
>>>>> * Any number of causes could likewise sever a single instance in a single
>>>>> application from all brokers.
>>>>> * Finally, networking problems can disconnect arbitrary pairs of Streams
>>>>> instances and Broker instances.
>>>>>
>>>>> This is not an accounting of all possible failure modes, obviously,
>>> but the
>>>>> point is that, in a large, decentralized organization, you can experience
>>>>> lots of transient failures that have some features in common:
>>>>> F1. It's someone else's fault, and someone else must take action to
>>> fix it.
>>>>> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
>>>>> F3. A single failure can affect "everyone" (e.g., one broker with
>>> long GCs
>>>>> can cause timeouts in all thousands of instances over all dozens of
>>> teams).
>>>>>
>>>>> As an operator belonging to one team, whether we bound retries or not,
>>>>> I would need to be alerted when the app stops making progress, I'd need
>>>>> to investigate, and in the above cases, I'd need to escalate to the
>>> network
>>>>> and/or broker infrastructure teams.
>>>>>
>>>>> Clearly, I can be alerted either by threads dying or by non-progress
>>>>> metrics.
>>>>> As a responsible operator, I'd have alerts on _both_, so we shouldn't
>>>>> consider
>>>>> either signal to be "louder" or more reliable than the other.
>>>>>
>>>>> A side observation: in a lot of the failure modes, a specific task won't
>>>>> be able
>>>>> to make progress no matter which thread or instance it's on (i.e., if the
>>>>> transaction coordinator for one of its output partitions is slow or
>>>>> unresponsive).
>>>>> Therefore, killing the thread with a bounded retry config would only
>>> result
>>>>> in a cascade of thread deaths across all my instances until either I run
>>>>> out of
>>>>> threads or the incident is resolved.
>>>>>
>>>>> The key questions to me are:
>>>>> Q1. Do we want to continue trying to make what progress we can while
>>>>> the incident is being investigated and remediated?
>>>>> Q2. Should I (the operator for a single team) have to take any action
>>> once
>>>>> the infrastructure failures are resolved?
>>>>>
>>>>> We can paraphrase these as, "do you want your business to grind to a halt
>>>>> due to a single failure?", and "do you want everyone to stay up all night
>>>>> waiting for a fix so they can all restart their applications?"
>>>>>
>>>>> Just from the operator/business perspective, it seems like we want:
>>>>> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
>>>>> to me that it would be better for Streams to just keep retrying
>>>>> indefinitely.
>>>>>
>>>>> There is one point I think you've mentioned to me in the past that it
>>>>> may not be _safe_ to just quit working on one specific task while
>>>>> progressing on others. If we have a repartition topic sourced by
>>>>> two tasks T1 and T2, and feeding a windowed aggregation (for example),
>>>>> then failing to process T1 while continuing on T2 for a long time
>>>>> would cause a lot of timestamp skew, and could ultimately result in all
>>>>> those delayed records in T1 being out of grace period by the time they
>>>>> get processed. Arguably, this is a completely normal and expected
>>>>> situation in a distributed system, which is why we have grace period to
>>>>> begin with, but since the cause of this particular skew is inside of
>>>>> Streams, it would be possible and nice to detect and avoid the situation.
>>>>>
>>>>> However, we should note that killing a single thread that hosts T1 will
>>>>> _not_ deterministically halt processing on T2, nor will stopping the
>>>>> single instance that hosts T1, since T2 might be on another instance.
>>>>> We would need a cluster-wide broadcast of some kind to either halt
>>>>> all processing on all tasks, or (more sophisticated) to halt processing
>>>>> on T2 when we detect non-progress of T1.
>>>>>
>>>>> Depending on the failure mode, it's possible that just shuffling the
>>> tasks
>>>>> around could let us start making progress again, for example when only
>>>>> a single Streams instance can't reach one or more brokers. However, this
>>>>> objective is not accomplished by just stopping one thread, we would need
>>>>> to relocate all tasks off the affected instance to attempt this
>>>>> remediation.
>>>>>
>>>>> A separate thing to point out is that, just an instance being unavailable
>>>>> for processing does not imply that it is also unavailable for querying.
>>>>> Especially in light of KIP-535, where we started to support querying
>>>>> "stale" stores, it seems worthwhile to keep the threads and instances
>>>>> alive, even if they pause processing.
>>>>>
>>>>> Well, if you've made it this far, congratulations. I owe you a beer.
>>> I hope
>>>>> you don't interpret the length of this email as reflective of my
>>> zeal. I'm
>>>>> also ok with a bounded retries config of some kind, but I wanted to be
>>>>> sure we've considered the above effects.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>>
>>>>> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
>>>> Hi,
>>>>
>>>> I would like to propose KIP-572 to make Kafka Streams more robust with
>>>> regard to timeout exception handling.
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
>>>> eouts+and+retries+in+Kafka+Streams
>>>>
>>>> Note, that there is a long list of rejected alternatives that can be
>>>> used as starting point for the discussion. In fact, I am not sure if
>>>> one of those listed alternative might be better than the current
>>>> proposal -- I just had to pick one design for now (the reason why I
>>>> picked the current design is that it's semantically fully backward
>>>> compatible and does not introduce any new configs).
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>> -Matthias
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>


Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by John Roesler <vv...@apache.org>.
Thanks for the update, Matthias!

Other than Bruno’s good points, this proposal looks good to me. 

Thanks,
John

On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
> Hi Matthias,
> 
> Thank you for the KIP. I like your KIP.
> 
> Here my feedback:
> 
> 1. The KIP is not clear about what should happen when task.timeout.ms
> expires. To facilitate the mapping from the error users might
> encounter due to timeouts to this KIP, it would be good to state the
> error that will be thrown when task.timeout.ms expires.
> 
> 2. The KIP does also not clearly state how task.timeout.ms is
> measured. Does the time start with the first timeout exception and
> then run until either the timeout expires or the task receives a
> successful reply? Or is it started each time the task is processed by
> the stream thread and stopped when its turn is over and when the sum
> of the single times without a successful reply reaches the timeout an
> error is thrown?
> 
> Best,
> Bruno
> 
> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> > John, Guozhang,
> >
> > thanks a lot for your feedback. I updated the KIP on a slightly
> > different angle: instead of using retries, we should switch to a timeout
> > based approach. I also extended the KIP to deprecate producer/admin
> > `retries` config.
> >
> > I like the proposal to skip a task if a client TimeoutException occurs
> > and just retry it later; update the KIP accordingly. However, I would
> > not retry forever by default. In general, all points you raised are
> > valid and the question is just what _default_ do we want to have. Given
> > the issue that tasks might get "out-of-sync" regarding their event-time
> > progress and that inexperience users might not do proper monitoring, I
> > prefer to have a "reasonable" default timeout if a task does not make
> > progress at all and fail for this case.
> >
> > I would also argue (following Guozhang) that we don't necessarily need
> > new metrics. Monitoring the number of alive threads (recently added),
> > consumer lag, processing rate etc should give an operator enough insight
> > into the application. I don't see the need atm to add some specify "task
> > timeout" metrics.
> >
> > For the issue of cascading failures, I would want to exclude it from
> > this KIP to keep it focused.
> >
> >
> > -Matthias
> >
> >
> > On 2/27/20 1:31 PM, Guozhang Wang wrote:
> > > Hello John,
> > >
> > > I'll make note that you owe me a beer now :)
> > >
> > > I think I'm leaning towards your approach as well based on my observations
> > > on previously reported timeout exceptions in the past. I once left some
> > > thoughts on Matthias' PR here
> > > https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
> > and I
> > > think I can better summarize my thoughts in this thread:
> > >
> > > 1) First of all, we need to think from user's perspective, what they'd
> > > really want to be notified:
> > >
> > > a. "If my application cannot make progress temporarily due to various
> > > transient issues (John had listed several examples already), just handle
> > > that internally and I do not wanna be notified and worry about how to tune
> > > my timeout configs at all".
> > > b. "If my application cannot make progress for a long time, which is
> > likely
> > > due to a bad config, a human error, network issues, etc such that I should
> > > be involved in the loop of trouble shooting, let me know sooner than
> > later".
> > >
> > > and what they'd not preferred but may happen today:
> > >
> > > c. "one transient error cause a thread to die, but then after tasks
> > > migrated everything goes to normal; so the application silently lost a
> > > thread without letting me know"; in fact, in such cases even a cascading
> > > exception that eventually kills all thread may be better since at
> > least the
> > > users would be notified.
> > >
> > > Based on that, instead of retrying immediately at the granularity each
> > > blocking call, it should be sufficient to only consider the handling logic
> > > at the thread level. That is, within an iteration of the thread, it would
> > > try to:
> > >
> > > * initialized some created tasks;
> > > * restored some restoring tasks;
> > > * processed some running tasks who have buffered records that are
> > > processable;
> > > * committed some tasks.
> > >
> > > In each of these steps, we may need to make some blocking calls in the
> > > underlying embedded clients, and if either of them timed out, we would not
> > > be able to make progress partially or not being able to make any progress
> > > at all. If we still want to set a configured value of "retries", I think a
> > > better idea would be to say "if we cannot make progress for consecutive N
> > > iterations of a thread, the user should be notified".
> > >
> > > ---------------
> > >
> > > 2) Second, let's consider what's a good way to notify the user. Today our
> > > way of notification is simple: throw the exception all the way up to
> > user's
> > > uncaught-exception-handler (if it's registered) and let the thread
> > die. I'm
> > > wondering if we could instead educate users to watch on some key metrics
> > > for "progress indicate" than relying on the exception handler though. Some
> > > candidates in mind:
> > >
> > > * consumer-lag: this is for both source topics and for repartition topics,
> > > it indicates if one or more of the tasks within each sub-topology is
> > > lagging or not; in the case where *some or all* of the threads cannot make
> > > progress. E.g. if a downstream task's thread is blocked somehow while its
> > > upstream task's thread is not, then the consumer-lag on the repartition
> > > topic would keep growing.
> > >
> > > * *idle* state: this is an idea we discussed in
> > > https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
> > > instance-level new state, if all threads of the instance cannot make
> > > progress (primarily for the reason that it cannot talk to the brokers).
> > >
> > > * process-rate: this is at thread-level. However if some tasks cannot make
> > > progress while others can still make progress within a thread, its
> > > process-rate would now drop to zero and it's a bit hard to indicate
> > > compared with comsumer-lag.
> > >
> > > If we feel that relying on metrics is better than throwing the exception
> > > and let the thread die, then we would not need to have the "retry" config
> > > as well.
> > >
> > > ---------------
> > >
> > > 3) This maybe semi-related to the timeout itself, but as I mentioned today
> > > one common issue we would need to resolve is to lose a thread BUT not
> > > losing the whole instance. In other words, we should consider when we have
> > > to throw an exception from a thread (not due to timeouts, but say due to
> > > some fatal error), should we just kill the corresponding thread or should
> > > we be more brutal and just kill the whole instance instead. I'm happy to
> > > defer this to another discussion thread but just bring this up here.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vv...@apache.org> wrote:
> > >
> > >> Hi Matthias,
> > >>
> > >> Thanks for the proposal! I think this will be a wonderful improvement
> > >> to Streams. In particular, thanks for the motivation. It would indeed
> > >> be nice not to have to set long timeout configs and block individual
> > >> client requests in order to cope with transient slow responses.
> > >>
> > >> I'm very well aware that this might make me sound like a crazy person,
> > >> but one alternative I'd like to consider is not bounding the retries at
> > >> all.
> > >> Instead, Streams would just skip over timed-out tasks and try again
> > >> on the next iteration, as you proposed, but would continue to do so
> > >> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
> > >> further
> > >> propose to log a warning every time a task times out and also maintain
> > >> a new metric indicating task timeouts.
> > >>
> > >> To see why this might be attractive, let me pose a hypothetical
> > >> installation
> > >> which has thousands of Streams instances, maybe as part of hundreds of
> > >> applications belonging to dozens of teams. Let's also assume there is a
> > >> single broker cluster serving all these instances. Such an
> > environment has
> > >> a number of transient failure modes:
> > >> * A single broker instance may become slow to respond due to hardware
> > >> failures (e.g., a bad NIC) or other environmental causes (CPU competition
> > >> with co-resident processes, long JVM GC pauses, etc.). Single-broker
> > >> unavailability could cause some tasks to time out while others can
> > proceed
> > >> in an individual Streams instance.
> > >> * The entire broker cluster could become temporarily unavailable
> > (consider:
> > >> a faulty firewall configuration gets deployed, severing all Streams
> > >> instances
> > >> from the brokers).
> > >> * A faulty security configuration may temporarily sever whole application
> > >> from
> > >> the brokers.
> > >> * Any number of causes could likewise sever a single instance in a single
> > >> application from all brokers.
> > >> * Finally, networking problems can disconnect arbitrary pairs of Streams
> > >> instances and Broker instances.
> > >>
> > >> This is not an accounting of all possible failure modes, obviously,
> > but the
> > >> point is that, in a large, decentralized organization, you can experience
> > >> lots of transient failures that have some features in common:
> > >> F1. It's someone else's fault, and someone else must take action to
> > fix it.
> > >> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
> > >> F3. A single failure can affect "everyone" (e.g., one broker with
> > long GCs
> > >> can cause timeouts in all thousands of instances over all dozens of
> > teams).
> > >>
> > >> As an operator belonging to one team, whether we bound retries or not,
> > >> I would need to be alerted when the app stops making progress, I'd need
> > >> to investigate, and in the above cases, I'd need to escalate to the
> > network
> > >> and/or broker infrastructure teams.
> > >>
> > >> Clearly, I can be alerted either by threads dying or by non-progress
> > >> metrics.
> > >> As a responsible operator, I'd have alerts on _both_, so we shouldn't
> > >> consider
> > >> either signal to be "louder" or more reliable than the other.
> > >>
> > >> A side observation: in a lot of the failure modes, a specific task won't
> > >> be able
> > >> to make progress no matter which thread or instance it's on (i.e., if the
> > >> transaction coordinator for one of its output partitions is slow or
> > >> unresponsive).
> > >> Therefore, killing the thread with a bounded retry config would only
> > result
> > >> in a cascade of thread deaths across all my instances until either I run
> > >> out of
> > >> threads or the incident is resolved.
> > >>
> > >> The key questions to me are:
> > >> Q1. Do we want to continue trying to make what progress we can while
> > >> the incident is being investigated and remediated?
> > >> Q2. Should I (the operator for a single team) have to take any action
> > once
> > >> the infrastructure failures are resolved?
> > >>
> > >> We can paraphrase these as, "do you want your business to grind to a halt
> > >> due to a single failure?", and "do you want everyone to stay up all night
> > >> waiting for a fix so they can all restart their applications?"
> > >>
> > >> Just from the operator/business perspective, it seems like we want:
> > >> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
> > >> to me that it would be better for Streams to just keep retrying
> > >> indefinitely.
> > >>
> > >> There is one point I think you've mentioned to me in the past that it
> > >> may not be _safe_ to just quit working on one specific task while
> > >> progressing on others. If we have a repartition topic sourced by
> > >> two tasks T1 and T2, and feeding a windowed aggregation (for example),
> > >> then failing to process T1 while continuing on T2 for a long time
> > >> would cause a lot of timestamp skew, and could ultimately result in all
> > >> those delayed records in T1 being out of grace period by the time they
> > >> get processed. Arguably, this is a completely normal and expected
> > >> situation in a distributed system, which is why we have grace period to
> > >> begin with, but since the cause of this particular skew is inside of
> > >> Streams, it would be possible and nice to detect and avoid the situation.
> > >>
> > >> However, we should note that killing a single thread that hosts T1 will
> > >> _not_ deterministically halt processing on T2, nor will stopping the
> > >> single instance that hosts T1, since T2 might be on another instance.
> > >> We would need a cluster-wide broadcast of some kind to either halt
> > >> all processing on all tasks, or (more sophisticated) to halt processing
> > >> on T2 when we detect non-progress of T1.
> > >>
> > >> Depending on the failure mode, it's possible that just shuffling the
> > tasks
> > >> around could let us start making progress again, for example when only
> > >> a single Streams instance can't reach one or more brokers. However, this
> > >> objective is not accomplished by just stopping one thread, we would need
> > >> to relocate all tasks off the affected instance to attempt this
> > >> remediation.
> > >>
> > >> A separate thing to point out is that, just an instance being unavailable
> > >> for processing does not imply that it is also unavailable for querying.
> > >> Especially in light of KIP-535, where we started to support querying
> > >> "stale" stores, it seems worthwhile to keep the threads and instances
> > >> alive, even if they pause processing.
> > >>
> > >> Well, if you've made it this far, congratulations. I owe you a beer.
> > I hope
> > >> you don't interpret the length of this email as reflective of my
> > zeal. I'm
> > >> also ok with a bounded retries config of some kind, but I wanted to be
> > >> sure we've considered the above effects.
> > >>
> > >> Thanks,
> > >> -John
> > >>
> > >>
> > >> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> > > Hi,
> > >
> > > I would like to propose KIP-572 to make Kafka Streams more robust with
> > > regard to timeout exception handling.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> > > eouts+and+retries+in+Kafka+Streams
> > >
> > > Note, that there is a long list of rejected alternatives that can be
> > > used as starting point for the discussion. In fact, I am not sure if
> > > one of those listed alternative might be better than the current
> > > proposal -- I just had to pick one design for now (the reason why I
> > > picked the current design is that it's semantically fully backward
> > > compatible and does not introduce any new configs).
> > >
> > > Looking forward to your feedback.
> > >
> > > -Matthias
> > >>>
> > >>
> > >
> > >
> >
>

Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by Bruno Cadonna <br...@confluent.io>.
Hi Matthias,

Thank you for the KIP. I like your KIP.

Here my feedback:

1. The KIP is not clear about what should happen when task.timeout.ms
expires. To facilitate the mapping from the error users might
encounter due to timeouts to this KIP, it would be good to state the
error that will be thrown when task.timeout.ms expires.

2. The KIP does also not clearly state how task.timeout.ms is
measured. Does the time start with the first timeout exception and
then run until either the timeout expires or the task receives a
successful reply? Or is it started each time the task is processed by
the stream thread and stopped when its turn is over and when the sum
of the single times without a successful reply reaches the timeout an
error is thrown?

Best,
Bruno

On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> John, Guozhang,
>
> thanks a lot for your feedback. I updated the KIP on a slightly
> different angle: instead of using retries, we should switch to a timeout
> based approach. I also extended the KIP to deprecate producer/admin
> `retries` config.
>
> I like the proposal to skip a task if a client TimeoutException occurs
> and just retry it later; update the KIP accordingly. However, I would
> not retry forever by default. In general, all points you raised are
> valid and the question is just what _default_ do we want to have. Given
> the issue that tasks might get "out-of-sync" regarding their event-time
> progress and that inexperience users might not do proper monitoring, I
> prefer to have a "reasonable" default timeout if a task does not make
> progress at all and fail for this case.
>
> I would also argue (following Guozhang) that we don't necessarily need
> new metrics. Monitoring the number of alive threads (recently added),
> consumer lag, processing rate etc should give an operator enough insight
> into the application. I don't see the need atm to add some specify "task
> timeout" metrics.
>
> For the issue of cascading failures, I would want to exclude it from
> this KIP to keep it focused.
>
>
> -Matthias
>
>
> On 2/27/20 1:31 PM, Guozhang Wang wrote:
> > Hello John,
> >
> > I'll make note that you owe me a beer now :)
> >
> > I think I'm leaning towards your approach as well based on my observations
> > on previously reported timeout exceptions in the past. I once left some
> > thoughts on Matthias' PR here
> > https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
> and I
> > think I can better summarize my thoughts in this thread:
> >
> > 1) First of all, we need to think from user's perspective, what they'd
> > really want to be notified:
> >
> > a. "If my application cannot make progress temporarily due to various
> > transient issues (John had listed several examples already), just handle
> > that internally and I do not wanna be notified and worry about how to tune
> > my timeout configs at all".
> > b. "If my application cannot make progress for a long time, which is
> likely
> > due to a bad config, a human error, network issues, etc such that I should
> > be involved in the loop of trouble shooting, let me know sooner than
> later".
> >
> > and what they'd not preferred but may happen today:
> >
> > c. "one transient error cause a thread to die, but then after tasks
> > migrated everything goes to normal; so the application silently lost a
> > thread without letting me know"; in fact, in such cases even a cascading
> > exception that eventually kills all thread may be better since at
> least the
> > users would be notified.
> >
> > Based on that, instead of retrying immediately at the granularity each
> > blocking call, it should be sufficient to only consider the handling logic
> > at the thread level. That is, within an iteration of the thread, it would
> > try to:
> >
> > * initialized some created tasks;
> > * restored some restoring tasks;
> > * processed some running tasks who have buffered records that are
> > processable;
> > * committed some tasks.
> >
> > In each of these steps, we may need to make some blocking calls in the
> > underlying embedded clients, and if either of them timed out, we would not
> > be able to make progress partially or not being able to make any progress
> > at all. If we still want to set a configured value of "retries", I think a
> > better idea would be to say "if we cannot make progress for consecutive N
> > iterations of a thread, the user should be notified".
> >
> > ---------------
> >
> > 2) Second, let's consider what's a good way to notify the user. Today our
> > way of notification is simple: throw the exception all the way up to
> user's
> > uncaught-exception-handler (if it's registered) and let the thread
> die. I'm
> > wondering if we could instead educate users to watch on some key metrics
> > for "progress indicate" than relying on the exception handler though. Some
> > candidates in mind:
> >
> > * consumer-lag: this is for both source topics and for repartition topics,
> > it indicates if one or more of the tasks within each sub-topology is
> > lagging or not; in the case where *some or all* of the threads cannot make
> > progress. E.g. if a downstream task's thread is blocked somehow while its
> > upstream task's thread is not, then the consumer-lag on the repartition
> > topic would keep growing.
> >
> > * *idle* state: this is an idea we discussed in
> > https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
> > instance-level new state, if all threads of the instance cannot make
> > progress (primarily for the reason that it cannot talk to the brokers).
> >
> > * process-rate: this is at thread-level. However if some tasks cannot make
> > progress while others can still make progress within a thread, its
> > process-rate would now drop to zero and it's a bit hard to indicate
> > compared with comsumer-lag.
> >
> > If we feel that relying on metrics is better than throwing the exception
> > and let the thread die, then we would not need to have the "retry" config
> > as well.
> >
> > ---------------
> >
> > 3) This maybe semi-related to the timeout itself, but as I mentioned today
> > one common issue we would need to resolve is to lose a thread BUT not
> > losing the whole instance. In other words, we should consider when we have
> > to throw an exception from a thread (not due to timeouts, but say due to
> > some fatal error), should we just kill the corresponding thread or should
> > we be more brutal and just kill the whole instance instead. I'm happy to
> > defer this to another discussion thread but just bring this up here.
> >
> >
> >
> > Guozhang
> >
> >
> > On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vv...@apache.org> wrote:
> >
> >> Hi Matthias,
> >>
> >> Thanks for the proposal! I think this will be a wonderful improvement
> >> to Streams. In particular, thanks for the motivation. It would indeed
> >> be nice not to have to set long timeout configs and block individual
> >> client requests in order to cope with transient slow responses.
> >>
> >> I'm very well aware that this might make me sound like a crazy person,
> >> but one alternative I'd like to consider is not bounding the retries at
> >> all.
> >> Instead, Streams would just skip over timed-out tasks and try again
> >> on the next iteration, as you proposed, but would continue to do so
> >> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
> >> further
> >> propose to log a warning every time a task times out and also maintain
> >> a new metric indicating task timeouts.
> >>
> >> To see why this might be attractive, let me pose a hypothetical
> >> installation
> >> which has thousands of Streams instances, maybe as part of hundreds of
> >> applications belonging to dozens of teams. Let's also assume there is a
> >> single broker cluster serving all these instances. Such an
> environment has
> >> a number of transient failure modes:
> >> * A single broker instance may become slow to respond due to hardware
> >> failures (e.g., a bad NIC) or other environmental causes (CPU competition
> >> with co-resident processes, long JVM GC pauses, etc.). Single-broker
> >> unavailability could cause some tasks to time out while others can
> proceed
> >> in an individual Streams instance.
> >> * The entire broker cluster could become temporarily unavailable
> (consider:
> >> a faulty firewall configuration gets deployed, severing all Streams
> >> instances
> >> from the brokers).
> >> * A faulty security configuration may temporarily sever whole application
> >> from
> >> the brokers.
> >> * Any number of causes could likewise sever a single instance in a single
> >> application from all brokers.
> >> * Finally, networking problems can disconnect arbitrary pairs of Streams
> >> instances and Broker instances.
> >>
> >> This is not an accounting of all possible failure modes, obviously,
> but the
> >> point is that, in a large, decentralized organization, you can experience
> >> lots of transient failures that have some features in common:
> >> F1. It's someone else's fault, and someone else must take action to
> fix it.
> >> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
> >> F3. A single failure can affect "everyone" (e.g., one broker with
> long GCs
> >> can cause timeouts in all thousands of instances over all dozens of
> teams).
> >>
> >> As an operator belonging to one team, whether we bound retries or not,
> >> I would need to be alerted when the app stops making progress, I'd need
> >> to investigate, and in the above cases, I'd need to escalate to the
> network
> >> and/or broker infrastructure teams.
> >>
> >> Clearly, I can be alerted either by threads dying or by non-progress
> >> metrics.
> >> As a responsible operator, I'd have alerts on _both_, so we shouldn't
> >> consider
> >> either signal to be "louder" or more reliable than the other.
> >>
> >> A side observation: in a lot of the failure modes, a specific task won't
> >> be able
> >> to make progress no matter which thread or instance it's on (i.e., if the
> >> transaction coordinator for one of its output partitions is slow or
> >> unresponsive).
> >> Therefore, killing the thread with a bounded retry config would only
> result
> >> in a cascade of thread deaths across all my instances until either I run
> >> out of
> >> threads or the incident is resolved.
> >>
> >> The key questions to me are:
> >> Q1. Do we want to continue trying to make what progress we can while
> >> the incident is being investigated and remediated?
> >> Q2. Should I (the operator for a single team) have to take any action
> once
> >> the infrastructure failures are resolved?
> >>
> >> We can paraphrase these as, "do you want your business to grind to a halt
> >> due to a single failure?", and "do you want everyone to stay up all night
> >> waiting for a fix so they can all restart their applications?"
> >>
> >> Just from the operator/business perspective, it seems like we want:
> >> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
> >> to me that it would be better for Streams to just keep retrying
> >> indefinitely.
> >>
> >> There is one point I think you've mentioned to me in the past that it
> >> may not be _safe_ to just quit working on one specific task while
> >> progressing on others. If we have a repartition topic sourced by
> >> two tasks T1 and T2, and feeding a windowed aggregation (for example),
> >> then failing to process T1 while continuing on T2 for a long time
> >> would cause a lot of timestamp skew, and could ultimately result in all
> >> those delayed records in T1 being out of grace period by the time they
> >> get processed. Arguably, this is a completely normal and expected
> >> situation in a distributed system, which is why we have grace period to
> >> begin with, but since the cause of this particular skew is inside of
> >> Streams, it would be possible and nice to detect and avoid the situation.
> >>
> >> However, we should note that killing a single thread that hosts T1 will
> >> _not_ deterministically halt processing on T2, nor will stopping the
> >> single instance that hosts T1, since T2 might be on another instance.
> >> We would need a cluster-wide broadcast of some kind to either halt
> >> all processing on all tasks, or (more sophisticated) to halt processing
> >> on T2 when we detect non-progress of T1.
> >>
> >> Depending on the failure mode, it's possible that just shuffling the
> tasks
> >> around could let us start making progress again, for example when only
> >> a single Streams instance can't reach one or more brokers. However, this
> >> objective is not accomplished by just stopping one thread, we would need
> >> to relocate all tasks off the affected instance to attempt this
> >> remediation.
> >>
> >> A separate thing to point out is that, just an instance being unavailable
> >> for processing does not imply that it is also unavailable for querying.
> >> Especially in light of KIP-535, where we started to support querying
> >> "stale" stores, it seems worthwhile to keep the threads and instances
> >> alive, even if they pause processing.
> >>
> >> Well, if you've made it this far, congratulations. I owe you a beer.
> I hope
> >> you don't interpret the length of this email as reflective of my
> zeal. I'm
> >> also ok with a bounded retries config of some kind, but I wanted to be
> >> sure we've considered the above effects.
> >>
> >> Thanks,
> >> -John
> >>
> >>
> >> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> > Hi,
> >
> > I would like to propose KIP-572 to make Kafka Streams more robust with
> > regard to timeout exception handling.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> > eouts+and+retries+in+Kafka+Streams
> >
> > Note, that there is a long list of rejected alternatives that can be
> > used as starting point for the discussion. In fact, I am not sure if
> > one of those listed alternative might be better than the current
> > proposal -- I just had to pick one design for now (the reason why I
> > picked the current design is that it's semantically fully backward
> > compatible and does not introduce any new configs).
> >
> > Looking forward to your feedback.
> >
> > -Matthias
> >>>
> >>
> >
> >
>

Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by "Matthias J. Sax" <mj...@apache.org>.
John, Guozhang,

thanks a lot for your feedback. I updated the KIP on a slightly
different angle: instead of using retries, we should switch to a timeout
based approach. I also extended the KIP to deprecate producer/admin
`retries` config.

I like the proposal to skip a task if a client TimeoutException occurs
and just retry it later; update the KIP accordingly. However, I would
not retry forever by default. In general, all points you raised are
valid and the question is just what _default_ do we want to have. Given
the issue that tasks might get "out-of-sync" regarding their event-time
progress and that inexperience users might not do proper monitoring, I
prefer to have a "reasonable" default timeout if a task does not make
progress at all and fail for this case.

I would also argue (following Guozhang) that we don't necessarily need
new metrics. Monitoring the number of alive threads (recently added),
consumer lag, processing rate etc should give an operator enough insight
into the application. I don't see the need atm to add some specify "task
timeout" metrics.

For the issue of cascading failures, I would want to exclude it from
this KIP to keep it focused.


-Matthias


On 2/27/20 1:31 PM, Guozhang Wang wrote:
> Hello John,
>
> I'll make note that you owe me a beer now :)
>
> I think I'm leaning towards your approach as well based on my observations
> on previously reported timeout exceptions in the past. I once left some
> thoughts on Matthias' PR here
> https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
and I
> think I can better summarize my thoughts in this thread:
>
> 1) First of all, we need to think from user's perspective, what they'd
> really want to be notified:
>
> a. "If my application cannot make progress temporarily due to various
> transient issues (John had listed several examples already), just handle
> that internally and I do not wanna be notified and worry about how to tune
> my timeout configs at all".
> b. "If my application cannot make progress for a long time, which is
likely
> due to a bad config, a human error, network issues, etc such that I should
> be involved in the loop of trouble shooting, let me know sooner than
later".
>
> and what they'd not preferred but may happen today:
>
> c. "one transient error cause a thread to die, but then after tasks
> migrated everything goes to normal; so the application silently lost a
> thread without letting me know"; in fact, in such cases even a cascading
> exception that eventually kills all thread may be better since at
least the
> users would be notified.
>
> Based on that, instead of retrying immediately at the granularity each
> blocking call, it should be sufficient to only consider the handling logic
> at the thread level. That is, within an iteration of the thread, it would
> try to:
>
> * initialized some created tasks;
> * restored some restoring tasks;
> * processed some running tasks who have buffered records that are
> processable;
> * committed some tasks.
>
> In each of these steps, we may need to make some blocking calls in the
> underlying embedded clients, and if either of them timed out, we would not
> be able to make progress partially or not being able to make any progress
> at all. If we still want to set a configured value of "retries", I think a
> better idea would be to say "if we cannot make progress for consecutive N
> iterations of a thread, the user should be notified".
>
> ---------------
>
> 2) Second, let's consider what's a good way to notify the user. Today our
> way of notification is simple: throw the exception all the way up to
user's
> uncaught-exception-handler (if it's registered) and let the thread
die. I'm
> wondering if we could instead educate users to watch on some key metrics
> for "progress indicate" than relying on the exception handler though. Some
> candidates in mind:
>
> * consumer-lag: this is for both source topics and for repartition topics,
> it indicates if one or more of the tasks within each sub-topology is
> lagging or not; in the case where *some or all* of the threads cannot make
> progress. E.g. if a downstream task's thread is blocked somehow while its
> upstream task's thread is not, then the consumer-lag on the repartition
> topic would keep growing.
>
> * *idle* state: this is an idea we discussed in
> https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
> instance-level new state, if all threads of the instance cannot make
> progress (primarily for the reason that it cannot talk to the brokers).
>
> * process-rate: this is at thread-level. However if some tasks cannot make
> progress while others can still make progress within a thread, its
> process-rate would now drop to zero and it's a bit hard to indicate
> compared with comsumer-lag.
>
> If we feel that relying on metrics is better than throwing the exception
> and let the thread die, then we would not need to have the "retry" config
> as well.
>
> ---------------
>
> 3) This maybe semi-related to the timeout itself, but as I mentioned today
> one common issue we would need to resolve is to lose a thread BUT not
> losing the whole instance. In other words, we should consider when we have
> to throw an exception from a thread (not due to timeouts, but say due to
> some fatal error), should we just kill the corresponding thread or should
> we be more brutal and just kill the whole instance instead. I'm happy to
> defer this to another discussion thread but just bring this up here.
>
>
>
> Guozhang
>
>
> On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vv...@apache.org> wrote:
>
>> Hi Matthias,
>>
>> Thanks for the proposal! I think this will be a wonderful improvement
>> to Streams. In particular, thanks for the motivation. It would indeed
>> be nice not to have to set long timeout configs and block individual
>> client requests in order to cope with transient slow responses.
>>
>> I'm very well aware that this might make me sound like a crazy person,
>> but one alternative I'd like to consider is not bounding the retries at
>> all.
>> Instead, Streams would just skip over timed-out tasks and try again
>> on the next iteration, as you proposed, but would continue to do so
>> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
>> further
>> propose to log a warning every time a task times out and also maintain
>> a new metric indicating task timeouts.
>>
>> To see why this might be attractive, let me pose a hypothetical
>> installation
>> which has thousands of Streams instances, maybe as part of hundreds of
>> applications belonging to dozens of teams. Let's also assume there is a
>> single broker cluster serving all these instances. Such an
environment has
>> a number of transient failure modes:
>> * A single broker instance may become slow to respond due to hardware
>> failures (e.g., a bad NIC) or other environmental causes (CPU competition
>> with co-resident processes, long JVM GC pauses, etc.). Single-broker
>> unavailability could cause some tasks to time out while others can
proceed
>> in an individual Streams instance.
>> * The entire broker cluster could become temporarily unavailable
(consider:
>> a faulty firewall configuration gets deployed, severing all Streams
>> instances
>> from the brokers).
>> * A faulty security configuration may temporarily sever whole application
>> from
>> the brokers.
>> * Any number of causes could likewise sever a single instance in a single
>> application from all brokers.
>> * Finally, networking problems can disconnect arbitrary pairs of Streams
>> instances and Broker instances.
>>
>> This is not an accounting of all possible failure modes, obviously,
but the
>> point is that, in a large, decentralized organization, you can experience
>> lots of transient failures that have some features in common:
>> F1. It's someone else's fault, and someone else must take action to
fix it.
>> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
>> F3. A single failure can affect "everyone" (e.g., one broker with
long GCs
>> can cause timeouts in all thousands of instances over all dozens of
teams).
>>
>> As an operator belonging to one team, whether we bound retries or not,
>> I would need to be alerted when the app stops making progress, I'd need
>> to investigate, and in the above cases, I'd need to escalate to the
network
>> and/or broker infrastructure teams.
>>
>> Clearly, I can be alerted either by threads dying or by non-progress
>> metrics.
>> As a responsible operator, I'd have alerts on _both_, so we shouldn't
>> consider
>> either signal to be "louder" or more reliable than the other.
>>
>> A side observation: in a lot of the failure modes, a specific task won't
>> be able
>> to make progress no matter which thread or instance it's on (i.e., if the
>> transaction coordinator for one of its output partitions is slow or
>> unresponsive).
>> Therefore, killing the thread with a bounded retry config would only
result
>> in a cascade of thread deaths across all my instances until either I run
>> out of
>> threads or the incident is resolved.
>>
>> The key questions to me are:
>> Q1. Do we want to continue trying to make what progress we can while
>> the incident is being investigated and remediated?
>> Q2. Should I (the operator for a single team) have to take any action
once
>> the infrastructure failures are resolved?
>>
>> We can paraphrase these as, "do you want your business to grind to a halt
>> due to a single failure?", and "do you want everyone to stay up all night
>> waiting for a fix so they can all restart their applications?"
>>
>> Just from the operator/business perspective, it seems like we want:
>> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
>> to me that it would be better for Streams to just keep retrying
>> indefinitely.
>>
>> There is one point I think you've mentioned to me in the past that it
>> may not be _safe_ to just quit working on one specific task while
>> progressing on others. If we have a repartition topic sourced by
>> two tasks T1 and T2, and feeding a windowed aggregation (for example),
>> then failing to process T1 while continuing on T2 for a long time
>> would cause a lot of timestamp skew, and could ultimately result in all
>> those delayed records in T1 being out of grace period by the time they
>> get processed. Arguably, this is a completely normal and expected
>> situation in a distributed system, which is why we have grace period to
>> begin with, but since the cause of this particular skew is inside of
>> Streams, it would be possible and nice to detect and avoid the situation.
>>
>> However, we should note that killing a single thread that hosts T1 will
>> _not_ deterministically halt processing on T2, nor will stopping the
>> single instance that hosts T1, since T2 might be on another instance.
>> We would need a cluster-wide broadcast of some kind to either halt
>> all processing on all tasks, or (more sophisticated) to halt processing
>> on T2 when we detect non-progress of T1.
>>
>> Depending on the failure mode, it's possible that just shuffling the
tasks
>> around could let us start making progress again, for example when only
>> a single Streams instance can't reach one or more brokers. However, this
>> objective is not accomplished by just stopping one thread, we would need
>> to relocate all tasks off the affected instance to attempt this
>> remediation.
>>
>> A separate thing to point out is that, just an instance being unavailable
>> for processing does not imply that it is also unavailable for querying.
>> Especially in light of KIP-535, where we started to support querying
>> "stale" stores, it seems worthwhile to keep the threads and instances
>> alive, even if they pause processing.
>>
>> Well, if you've made it this far, congratulations. I owe you a beer.
I hope
>> you don't interpret the length of this email as reflective of my
zeal. I'm
>> also ok with a bounded retries config of some kind, but I wanted to be
>> sure we've considered the above effects.
>>
>> Thanks,
>> -John
>>
>>
>> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> Hi,
> 
> I would like to propose KIP-572 to make Kafka Streams more robust with
> regard to timeout exception handling.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> eouts+and+retries+in+Kafka+Streams
> 
> Note, that there is a long list of rejected alternatives that can be
> used as starting point for the discussion. In fact, I am not sure if
> one of those listed alternative might be better than the current
> proposal -- I just had to pick one design for now (the reason why I
> picked the current design is that it's semantically fully backward
> compatible and does not introduce any new configs).
> 
> Looking forward to your feedback.
> 
> -Matthias
>>>
>>
>
>


Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hello John,

I'll make note that you owe me a beer now :)

I think I'm leaning towards your approach as well based on my observations
on previously reported timeout exceptions in the past. I once left some
thoughts on Matthias' PR here
https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510 and I
think I can better summarize my thoughts in this thread:

1) First of all, we need to think from user's perspective, what they'd
really want to be notified:

a. "If my application cannot make progress temporarily due to various
transient issues (John had listed several examples already), just handle
that internally and I do not wanna be notified and worry about how to tune
my timeout configs at all".
b. "If my application cannot make progress for a long time, which is likely
due to a bad config, a human error, network issues, etc such that I should
be involved in the loop of trouble shooting, let me know sooner than later".

and what they'd not preferred but may happen today:

c. "one transient error cause a thread to die, but then after tasks
migrated everything goes to normal; so the application silently lost a
thread without letting me know"; in fact, in such cases even a cascading
exception that eventually kills all thread may be better since at least the
users would be notified.

Based on that, instead of retrying immediately at the granularity each
blocking call, it should be sufficient to only consider the handling logic
at the thread level. That is, within an iteration of the thread, it would
try to:

* initialized some created tasks;
* restored some restoring tasks;
* processed some running tasks who have buffered records that are
processable;
* committed some tasks.

In each of these steps, we may need to make some blocking calls in the
underlying embedded clients, and if either of them timed out, we would not
be able to make progress partially or not being able to make any progress
at all. If we still want to set a configured value of "retries", I think a
better idea would be to say "if we cannot make progress for consecutive N
iterations of a thread, the user should be notified".

---------------

2) Second, let's consider what's a good way to notify the user. Today our
way of notification is simple: throw the exception all the way up to user's
uncaught-exception-handler (if it's registered) and let the thread die. I'm
wondering if we could instead educate users to watch on some key metrics
for "progress indicate" than relying on the exception handler though. Some
candidates in mind:

* consumer-lag: this is for both source topics and for repartition topics,
it indicates if one or more of the tasks within each sub-topology is
lagging or not; in the case where *some or all* of the threads cannot make
progress. E.g. if a downstream task's thread is blocked somehow while its
upstream task's thread is not, then the consumer-lag on the repartition
topic would keep growing.

* *idle* state: this is an idea we discussed in
https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
instance-level new state, if all threads of the instance cannot make
progress (primarily for the reason that it cannot talk to the brokers).

* process-rate: this is at thread-level. However if some tasks cannot make
progress while others can still make progress within a thread, its
process-rate would now drop to zero and it's a bit hard to indicate
compared with comsumer-lag.

If we feel that relying on metrics is better than throwing the exception
and let the thread die, then we would not need to have the "retry" config
as well.

---------------

3) This maybe semi-related to the timeout itself, but as I mentioned today
one common issue we would need to resolve is to lose a thread BUT not
losing the whole instance. In other words, we should consider when we have
to throw an exception from a thread (not due to timeouts, but say due to
some fatal error), should we just kill the corresponding thread or should
we be more brutal and just kill the whole instance instead. I'm happy to
defer this to another discussion thread but just bring this up here.



Guozhang


On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vv...@apache.org> wrote:

> Hi Matthias,
>
> Thanks for the proposal! I think this will be a wonderful improvement
> to Streams. In particular, thanks for the motivation. It would indeed
> be nice not to have to set long timeout configs and block individual
> client requests in order to cope with transient slow responses.
>
> I'm very well aware that this might make me sound like a crazy person,
> but one alternative I'd like to consider is not bounding the retries at
> all.
> Instead, Streams would just skip over timed-out tasks and try again
> on the next iteration, as you proposed, but would continue to do so
> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
> further
> propose to log a warning every time a task times out and also maintain
> a new metric indicating task timeouts.
>
> To see why this might be attractive, let me pose a hypothetical
> installation
> which has thousands of Streams instances, maybe as part of hundreds of
> applications belonging to dozens of teams. Let's also assume there is a
> single broker cluster serving all these instances. Such an environment has
> a number of transient failure modes:
> * A single broker instance may become slow to respond due to hardware
> failures (e.g., a bad NIC) or other environmental causes (CPU competition
> with co-resident processes, long JVM GC pauses, etc.). Single-broker
> unavailability could cause some tasks to time out while others can proceed
> in an individual Streams instance.
> * The entire broker cluster could become temporarily unavailable (consider:
> a faulty firewall configuration gets deployed, severing all Streams
> instances
> from the brokers).
> * A faulty security configuration may temporarily sever whole application
> from
> the brokers.
> * Any number of causes could likewise sever a single instance in a single
> application from all brokers.
> * Finally, networking problems can disconnect arbitrary pairs of Streams
> instances and Broker instances.
>
> This is not an accounting of all possible failure modes, obviously, but the
> point is that, in a large, decentralized organization, you can experience
> lots of transient failures that have some features in common:
> F1. It's someone else's fault, and someone else must take action to fix it.
> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
> F3. A single failure can affect "everyone" (e.g., one broker with long GCs
> can cause timeouts in all thousands of instances over all dozens of teams).
>
> As an operator belonging to one team, whether we bound retries or not,
> I would need to be alerted when the app stops making progress, I'd need
> to investigate, and in the above cases, I'd need to escalate to the network
> and/or broker infrastructure teams.
>
> Clearly, I can be alerted either by threads dying or by non-progress
> metrics.
> As a responsible operator, I'd have alerts on _both_, so we shouldn't
> consider
> either signal to be "louder" or more reliable than the other.
>
> A side observation: in a lot of the failure modes, a specific task won't
> be able
> to make progress no matter which thread or instance it's on (i.e., if the
> transaction coordinator for one of its output partitions is slow or
> unresponsive).
> Therefore, killing the thread with a bounded retry config would only result
> in a cascade of thread deaths across all my instances until either I run
> out of
> threads or the incident is resolved.
>
> The key questions to me are:
> Q1. Do we want to continue trying to make what progress we can while
> the incident is being investigated and remediated?
> Q2. Should I (the operator for a single team) have to take any action once
> the infrastructure failures are resolved?
>
> We can paraphrase these as, "do you want your business to grind to a halt
> due to a single failure?", and "do you want everyone to stay up all night
> waiting for a fix so they can all restart their applications?"
>
> Just from the operator/business perspective, it seems like we want:
> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
> to me that it would be better for Streams to just keep retrying
> indefinitely.
>
> There is one point I think you've mentioned to me in the past that it
> may not be _safe_ to just quit working on one specific task while
> progressing on others. If we have a repartition topic sourced by
> two tasks T1 and T2, and feeding a windowed aggregation (for example),
> then failing to process T1 while continuing on T2 for a long time
> would cause a lot of timestamp skew, and could ultimately result in all
> those delayed records in T1 being out of grace period by the time they
> get processed. Arguably, this is a completely normal and expected
> situation in a distributed system, which is why we have grace period to
> begin with, but since the cause of this particular skew is inside of
> Streams, it would be possible and nice to detect and avoid the situation.
>
> However, we should note that killing a single thread that hosts T1 will
> _not_ deterministically halt processing on T2, nor will stopping the
> single instance that hosts T1, since T2 might be on another instance.
> We would need a cluster-wide broadcast of some kind to either halt
> all processing on all tasks, or (more sophisticated) to halt processing
> on T2 when we detect non-progress of T1.
>
> Depending on the failure mode, it's possible that just shuffling the tasks
> around could let us start making progress again, for example when only
> a single Streams instance can't reach one or more brokers. However, this
> objective is not accomplished by just stopping one thread, we would need
> to relocate all tasks off the affected instance to attempt this
> remediation.
>
> A separate thing to point out is that, just an instance being unavailable
> for processing does not imply that it is also unavailable for querying.
> Especially in light of KIP-535, where we started to support querying
> "stale" stores, it seems worthwhile to keep the threads and instances
> alive, even if they pause processing.
>
> Well, if you've made it this far, congratulations. I owe you a beer. I hope
> you don't interpret the length of this email as reflective of my zeal. I'm
> also ok with a bounded retries config of some kind, but I wanted to be
> sure we've considered the above effects.
>
> Thanks,
> -John
>
>
> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> > -----BEGIN PGP SIGNED MESSAGE-----
> > Hash: SHA512
> >
> > Hi,
> >
> > I would like to propose KIP-572 to make Kafka Streams more robust with
> > regard to timeout exception handling.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> > eouts+and+retries+in+Kafka+Streams
> >
> > Note, that there is a long list of rejected alternatives that can be
> > used as starting point for the discussion. In fact, I am not sure if
> > one of those listed alternative might be better than the current
> > proposal -- I just had to pick one design for now (the reason why I
> > picked the current design is that it's semantically fully backward
> > compatible and does not introduce any new configs).
> >
> > Looking forward to your feedback.
> >
> > - -Matthias
> > -----BEGIN PGP SIGNATURE-----
> >
> > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Qz2kACgkQO4miYXKq
> > /Oj6mw/9E/AZlhMZRb1WKPENxeacXNLtlzamJZira9tcbQVGZ6/PBldFrx/T0/rG
> > HooPuyb4m3mFPB1JJ5lc5VujkIVGbet5Xq6MHishJ1LEKgVKtXLWlhp6RMZAfNCK
> > hzzwVV5Ddkc7ooKMAlIzb16Yfxr99YVl9umMO/rroPp7RWgIVM5jHIWXH7sGUDSA
> > qElyuIdUkDXq0QzITt65QWHeWfy59RbLSetvDZmgaZ8IT20IBur80LSrNlfLfHk6
> > XxjtPUm0OEplp8mrVYw4mGR+SX2aMjEjZ9PUpSV8hHoQjf6jF5TmZJPOd+Gv3b8v
> > WtqTFHRvXaz5gdGBmR5evj60OOETwZcqspJ+PGNRQmu9MO/fJ6iMPiz5FK7I34om
> > 43dwnKvmUdJakFkcsF7rHzuU5zp9txlnyCTQGqB6U34cC3RuUPNUEKDjFXSLXTXd
> > XgDagg+TK8sa3v+zFrk6Y/gbX4jGEBf/DOzxt980Pu5ahGznefGbAuVZ6SDAIhm5
> > 3NHiHGXRIhbp++gknPOq8UB1/eoshk6iL7+L/W1m2nnmvl/HvJIy0+w/5Mv9VvPF
> > 01NVryC6jE2u6eE0SLDHA/dBaQ6TY0nk/1fIadJTmgfhUXUFC16JPmrUuBMkd+fN
> > QuTXHZJKS/brcg+DL+L01nd5nKn6jKH+OB+VxFQJuVCdSo4bKzg=
> > =53Xz
> > -----END PGP SIGNATURE-----
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

Posted by John Roesler <vv...@apache.org>.
Hi Matthias,

Thanks for the proposal! I think this will be a wonderful improvement
to Streams. In particular, thanks for the motivation. It would indeed
be nice not to have to set long timeout configs and block individual 
client requests in order to cope with transient slow responses.

I'm very well aware that this might make me sound like a crazy person,
but one alternative I'd like to consider is not bounding the retries at all.
Instead, Streams would just skip over timed-out tasks and try again
on the next iteration, as you proposed, but would continue to do so
indefinitely. Clearly, we shouldn't do such a thing silently, so I'd further
propose to log a warning every time a task times out and also maintain
a new metric indicating task timeouts.

To see why this might be attractive, let me pose a hypothetical installation
which has thousands of Streams instances, maybe as part of hundreds of
applications belonging to dozens of teams. Let's also assume there is a
single broker cluster serving all these instances. Such an environment has
a number of transient failure modes:
* A single broker instance may become slow to respond due to hardware
failures (e.g., a bad NIC) or other environmental causes (CPU competition
with co-resident processes, long JVM GC pauses, etc.). Single-broker
unavailability could cause some tasks to time out while others can proceed
in an individual Streams instance.
* The entire broker cluster could become temporarily unavailable (consider:
a faulty firewall configuration gets deployed, severing all Streams instances
from the brokers).
* A faulty security configuration may temporarily sever whole application from
the brokers.
* Any number of causes could likewise sever a single instance in a single
application from all brokers.
* Finally, networking problems can disconnect arbitrary pairs of Streams
instances and Broker instances.

This is not an accounting of all possible failure modes, obviously, but the
point is that, in a large, decentralized organization, you can experience
lots of transient failures that have some features in common:
F1. It's someone else's fault, and someone else must take action to fix it.
F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
F3. A single failure can affect "everyone" (e.g., one broker with long GCs
can cause timeouts in all thousands of instances over all dozens of teams).

As an operator belonging to one team, whether we bound retries or not,
I would need to be alerted when the app stops making progress, I'd need
to investigate, and in the above cases, I'd need to escalate to the network
and/or broker infrastructure teams.

Clearly, I can be alerted either by threads dying or by non-progress metrics.
As a responsible operator, I'd have alerts on _both_, so we shouldn't consider
either signal to be "louder" or more reliable than the other.

A side observation: in a lot of the failure modes, a specific task won't be able
to make progress no matter which thread or instance it's on (i.e., if the
transaction coordinator for one of its output partitions is slow or unresponsive).
Therefore, killing the thread with a bounded retry config would only result
in a cascade of thread deaths across all my instances until either I run out of
threads or the incident is resolved.

The key questions to me are:
Q1. Do we want to continue trying to make what progress we can while
the incident is being investigated and remediated?
Q2. Should I (the operator for a single team) have to take any action once
the infrastructure failures are resolved?

We can paraphrase these as, "do you want your business to grind to a halt
due to a single failure?", and "do you want everyone to stay up all night
waiting for a fix so they can all restart their applications?"

Just from the operator/business perspective, it seems like we want:
Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
to me that it would be better for Streams to just keep retrying indefinitely.

There is one point I think you've mentioned to me in the past that it
may not be _safe_ to just quit working on one specific task while
progressing on others. If we have a repartition topic sourced by
two tasks T1 and T2, and feeding a windowed aggregation (for example),
then failing to process T1 while continuing on T2 for a long time
would cause a lot of timestamp skew, and could ultimately result in all
those delayed records in T1 being out of grace period by the time they
get processed. Arguably, this is a completely normal and expected
situation in a distributed system, which is why we have grace period to
begin with, but since the cause of this particular skew is inside of
Streams, it would be possible and nice to detect and avoid the situation.

However, we should note that killing a single thread that hosts T1 will
_not_ deterministically halt processing on T2, nor will stopping the
single instance that hosts T1, since T2 might be on another instance.
We would need a cluster-wide broadcast of some kind to either halt
all processing on all tasks, or (more sophisticated) to halt processing
on T2 when we detect non-progress of T1.

Depending on the failure mode, it's possible that just shuffling the tasks
around could let us start making progress again, for example when only
a single Streams instance can't reach one or more brokers. However, this
objective is not accomplished by just stopping one thread, we would need
to relocate all tasks off the affected instance to attempt this remediation.

A separate thing to point out is that, just an instance being unavailable
for processing does not imply that it is also unavailable for querying.
Especially in light of KIP-535, where we started to support querying
"stale" stores, it seems worthwhile to keep the threads and instances
alive, even if they pause processing.

Well, if you've made it this far, congratulations. I owe you a beer. I hope
you don't interpret the length of this email as reflective of my zeal. I'm
also ok with a bounded retries config of some kind, but I wanted to be
sure we've considered the above effects.

Thanks,
-John


On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
> 
> Hi,
> 
> I would like to propose KIP-572 to make Kafka Streams more robust with
> regard to timeout exception handling.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> eouts+and+retries+in+Kafka+Streams
> 
> Note, that there is a long list of rejected alternatives that can be
> used as starting point for the discussion. In fact, I am not sure if
> one of those listed alternative might be better than the current
> proposal -- I just had to pick one design for now (the reason why I
> picked the current design is that it's semantically fully backward
> compatible and does not introduce any new configs).
> 
> Looking forward to your feedback.
> 
> - -Matthias
> -----BEGIN PGP SIGNATURE-----
> 
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Qz2kACgkQO4miYXKq
> /Oj6mw/9E/AZlhMZRb1WKPENxeacXNLtlzamJZira9tcbQVGZ6/PBldFrx/T0/rG
> HooPuyb4m3mFPB1JJ5lc5VujkIVGbet5Xq6MHishJ1LEKgVKtXLWlhp6RMZAfNCK
> hzzwVV5Ddkc7ooKMAlIzb16Yfxr99YVl9umMO/rroPp7RWgIVM5jHIWXH7sGUDSA
> qElyuIdUkDXq0QzITt65QWHeWfy59RbLSetvDZmgaZ8IT20IBur80LSrNlfLfHk6
> XxjtPUm0OEplp8mrVYw4mGR+SX2aMjEjZ9PUpSV8hHoQjf6jF5TmZJPOd+Gv3b8v
> WtqTFHRvXaz5gdGBmR5evj60OOETwZcqspJ+PGNRQmu9MO/fJ6iMPiz5FK7I34om
> 43dwnKvmUdJakFkcsF7rHzuU5zp9txlnyCTQGqB6U34cC3RuUPNUEKDjFXSLXTXd
> XgDagg+TK8sa3v+zFrk6Y/gbX4jGEBf/DOzxt980Pu5ahGznefGbAuVZ6SDAIhm5
> 3NHiHGXRIhbp++gknPOq8UB1/eoshk6iL7+L/W1m2nnmvl/HvJIy0+w/5Mv9VvPF
> 01NVryC6jE2u6eE0SLDHA/dBaQ6TY0nk/1fIadJTmgfhUXUFC16JPmrUuBMkd+fN
> QuTXHZJKS/brcg+DL+L01nd5nKn6jKH+OB+VxFQJuVCdSo4bKzg=
> =53Xz
> -----END PGP SIGNATURE-----
>