You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Morgan Geldenhuys <mo...@tu-berlin.de> on 2020/02/03 11:20:28 UTC

Question: Determining Total Recovery Time

Community,

I am interested in determining the total time to recover for a Flink 
application after experiencing a partial failure. Let's assume a 
pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once 
guarantees enabled.

Taking a look at the documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), 
one of the metrics which can be gathered is /recoveryTime/. However, as 
far as I can tell this is only the time taken for the system to go from 
an inconsistent state back into a consistent state, i.e. restarting the 
job. Is there any way of measuring the amount of time taken from the 
point when the failure occurred till the point when the system catches 
up to the last message that was processed before the outage?

Thank you very much in advance!

Regards,
Morgan.

Re: Question: Determining Total Recovery Time

Posted by Zhijiang <wa...@aliyun.com>.
Hi Morgan,

Your idea is great and i am also interested in it.
 I think it is valuable for some users to estimate the maximum throughput capacity based on certain metrics or models.
But I am not quite sure whether it is feasible to do that based on existing metrics, at-least exist some limitations as Arvid mentioned.

If I understand correctly, the maximum throughput you want to measure is based on source emitting,
 that means some data are still buffered in mid topology and not processed yet. If so, we might refer to the metrics of `inputQueueLength`
and `inPoolUsage` together. Note if the `inPoolUsage` reaches 100%, it does not mean all the buffers are already filled with data, and just mean
all the available buffers are requested away. So `inputQueueLength` would be more precise to predict the available condition if we are aware of the
total buffer amount. In general we can make use of these two together.

 We can find the largest value of above metrics from all the topology tasks, which probably hint the bottleneck in the whole view. Then we can estimate
how many available buffers are left to hold more source emitting throughput. But there is a limitation if all the metrics are `0` in light-weight situation,
which i mentioned above. So we can not estimate the saturation unless we increase the source emit.

Wish good news sharing from you!

Best,
Zhijiang


------------------------------------------------------------------
From:Arvid Heise <ar...@ververica.com>
Send Time:2020 Feb. 26 (Wed.) 22:29
To:Morgan Geldenhuys <mo...@tu-berlin.de>
Cc:Timo Walther <tw...@apache.org>; user <us...@flink.apache.org>
Subject:Re: Question: Determining Total Recovery Time

Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings (which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect the bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer sizes and the insights would help us.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service
On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys <mo...@tu-berlin.de> wrote:

 Hi Arvid, Timo,

 Really appreciate the feedback. I have one final question on this topic and hope you dont mind me posing it to you directly. I posted the question earlier to the mailing list, but am looking at this more from an academic perspective as apposed to manually optimizing a specific job for a specific production environment. I do not know the flink internals well enough to determine if I can accomplish what I am looking for.

 For an experiment, I need to work out the Total Recovery Time (TRT). I define this as the time it takes the system to "catch up" to the current timestamp assuming event time processing after a node failure.

 I would like to follow a heuristic approach which is: 

job+environment agnostic, 
does not involve load testing, 
does not involve modifying the job or flink codebase, and 
relies solely on the metrics supplied.  As far as I know (and correct me if im wrong): TRT = heartbeat.timeout + recoveryTime+ time to reprocess uncheckpointed messages + lag to catch up to current timestamp.

 In order to predict TRT, I need some kind of resource utilization model based on the current processing capacity and maximum processing limit, let me explain:

Backpressure is essentially the point at which utilization has reached 100% for any particular streaming pipeline and means that the application has reached the max limit of messages that it can process per second. 
Lets consider an example: The system is running along perfectly fine under normal conditions, accessing external sources, and processing at an average of 100,000 messages/sec. Lets assume the maximum capacity is around 130,000 message/sec before back pressure starts propagating messages back up the stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we dont know that 130,000 is the limit without load testing. 
For this example, is there a way of finding this maximum capacity (and hence the utilization) without pushing the system to its limit based solely on the average current throughput? Possibly by measuring the saturation of certain buffers between the operators? 
 If this is possible, the unused utilization can be used to predict how fast a system would get back to the current timestamp. Again, its a heuristic so it doesn't have to be extremely precise. Any hints would be greatly appreciated.

 Thank you very much!

 Regards,
 Morgan.

On 21.02.20 14:44, Arvid Heise wrote:
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure that the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon restart. Performance will take a hit but if that is significant depends on the remaining pipeline.

Btw, at least once should be enough for that, since you implicitly deduplicating.

Best,

Arvid

On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys <mo...@tu-berlin.de> wrote:
 Thanks for the advice, i will look into it.

 Had a quick think about another simple solution but we would need a hook into the checkpoint process from the task/operator perspective, which I haven't looked into yet. It would work like this:

 - The sink operators (?) would keep a local copy of the last message processed (or digest?), the current timestamp, and a boolean value indicating whether or not the system is in recovery or not. 
 - While not in recovery, update the local copy and timestamp with each new event processed.
 - When a failure is detected and the taskmanagers are notified to rollback, we use the hook into this process to switch the boolean value to true. 
 - While true, it compares each new message with the last one processed before the recovery process was initiated.
 - When a match is found, the difference between the previous and current timestamp is calculated and outputted as a custom metric and the boolean is reset to false.

 From here, the mean total recovery time could be calculated across the operators. Not sure how it would impact on performance, but i doubt it would be significant. We would need to ensure exactly once so that the message would be guaranteed to be seen again. thoughts?

On 11.02.20 08:57, Arvid Heise wrote:
Hi Morgan,

as Timo pointed out, there is no general solution, but in your setting, you could look at the consumer lag of the input topic after a crash. Lag would spike until all tasks restarted and reprocessing begins. Offsets are only committed on checkpoints though by default.

Best,

Arvid

On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <tw...@apache.org> wrote:
 Hi Morgan,

 as far as I know this is not possible mostly because measuring "till the 
 point when the system catches up to the last message" is very 
 pipeline/connector dependent. Some sources might need to read from the 
 very beginning, some just continue from the latest checkpointed offset.

 Measure things like that (e.g. for experiments) might require collecting 
 own metrics as part of your pipeline definition.

 Regards,
 Timo


 On 03.02.20 12:20, Morgan Geldenhuys wrote:
 > Community,
 > 
 > I am interested in determining the total time to recover for a Flink 
 > application after experiencing a partial failure. Let's assume a 
 > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once 
 > guarantees enabled.
 > 
 > Taking a look at the documentation 
 > (https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), 
 > one of the metrics which can be gathered is /recoveryTime/. However, as 
 > far as I can tell this is only the time taken for the system to go from 
 > an inconsistent state back into a consistent state, i.e. restarting the 
 > job. Is there any way of measuring the amount of time taken from the 
 > point when the failure occurred till the point when the system catches 
 > up to the last message that was processed before the outage?
 > 
 > Thank you very much in advance!
 > 
 > Regards,
 > Morgan.





Re: Question: Determining Total Recovery Time

Posted by Arvid Heise <ar...@ververica.com>.
Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings
(which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect
the bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer
sizes and the insights would help us.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service

On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys <
morgan.geldenhuys@tu-berlin.de> wrote:

> Hi Arvid, Timo,
>
> Really appreciate the feedback. I have one final question on this topic
> and hope you dont mind me posing it to you directly. I posted the question
> earlier to the mailing list, but am looking at this more from an academic
> perspective as apposed to manually optimizing a specific job for a specific
> production environment. I do not know the flink internals well enough to
> determine if I can accomplish what I am looking for.
>
> For an experiment, I need to work out the Total Recovery Time (TRT). I
> define this as the time it takes the system to "catch up" to the current
> timestamp assuming event time processing after a node failure.
>
> I would like to follow a heuristic approach which is:
>
>    1. job+environment agnostic,
>    2. does not involve load testing,
>    3. does not involve modifying the job or flink codebase, and
>    4. relies solely on the metrics supplied.
>
> As far as I know (and correct me if im wrong): TRT = heartbeat.timeout +
> recoveryTime+ time to reprocess uncheckpointed messages + lag to catch up
> to current timestamp.
>
> In order to predict TRT, I need some kind of resource utilization model
> based on the current processing capacity and maximum processing limit, let
> me explain:
>
>    - Backpressure is essentially the point at which utilization has
>    reached 100% for any particular streaming pipeline and means that the
>    application has reached the max limit of messages that it can process per
>    second.
>    - Lets consider an example: The system is running along perfectly fine
>    under normal conditions, accessing external sources, and processing at an
>    average of 100,000 messages/sec. Lets assume the maximum capacity is around
>    130,000 message/sec before back pressure starts propagating messages back
>    up the stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at
>    present we dont know that 130,000 is the limit without load testing.
>    - For this example, is there a way of finding this maximum capacity
>    (and hence the utilization) without pushing the system to its limit based
>    solely on the average current throughput? Possibly by measuring the
>    saturation of certain buffers between the operators?
>
> If this is possible, the unused utilization can be used to predict how
> fast a system would get back to the current timestamp. Again, its a
> heuristic so it doesn't have to be extremely precise. Any hints would be
> greatly appreciated.
>
> Thank you very much!
>
> Regards,
> Morgan.
>
> On 21.02.20 14:44, Arvid Heise wrote:
>
> Hi Morgan,
>
> sorry for the late reply. In general, that should work. You need to ensure
> that the same task is processing the same record though.
>
> Local copy needs to be state or else the last message would be lost upon
> restart. Performance will take a hit but if that is significant depends on
> the remaining pipeline.
>
> Btw, at least once should be enough for that, since you implicitly
> deduplicating.
>
> Best,
>
> Arvid
>
> On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys <
> morgan.geldenhuys@tu-berlin.de> wrote:
>
>> Thanks for the advice, i will look into it.
>>
>> Had a quick think about another simple solution but we would need a hook
>> into the checkpoint process from the task/operator perspective, which I
>> haven't looked into yet. It would work like this:
>>
>> - The sink operators (?) would keep a local copy of the last message
>> processed (or digest?), the current timestamp, and a boolean value
>> indicating whether or not the system is in recovery or not.
>> - While not in recovery, update the local copy and timestamp with each
>> new event processed.
>> - When a failure is detected and the taskmanagers are notified to
>> rollback, we use the hook into this process to switch the boolean value to
>> true.
>> - While true, it compares each new message with the last one processed
>> before the recovery process was initiated.
>> - When a match is found, the difference between the previous and current
>> timestamp is calculated and outputted as a custom metric and the boolean is
>> reset to false.
>>
>> From here, the mean total recovery time could be calculated across the
>> operators. Not sure how it would impact on performance, but i doubt it
>> would be significant. We would need to ensure exactly once so that the
>> message would be guaranteed to be seen again. thoughts?
>>
>> On 11.02.20 08:57, Arvid Heise wrote:
>>
>> Hi Morgan,
>>
>> as Timo pointed out, there is no general solution, but in your setting,
>> you could look at the consumer lag of the input topic after a crash. Lag
>> would spike until all tasks restarted and reprocessing begins. Offsets are
>> only committed on checkpoints though by default.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Morgan,
>>>
>>> as far as I know this is not possible mostly because measuring "till the
>>> point when the system catches up to the last message" is very
>>> pipeline/connector dependent. Some sources might need to read from the
>>> very beginning, some just continue from the latest checkpointed offset.
>>>
>>> Measure things like that (e.g. for experiments) might require collecting
>>> own metrics as part of your pipeline definition.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 03.02.20 12:20, Morgan Geldenhuys wrote:
>>> > Community,
>>> >
>>> > I am interested in determining the total time to recover for a Flink
>>> > application after experiencing a partial failure. Let's assume a
>>> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
>>> > guarantees enabled.
>>> >
>>> > Taking a look at the documentation
>>> > (
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>>>
>>> > one of the metrics which can be gathered is /recoveryTime/. However,
>>> as
>>> > far as I can tell this is only the time taken for the system to go
>>> from
>>> > an inconsistent state back into a consistent state, i.e. restarting
>>> the
>>> > job. Is there any way of measuring the amount of time taken from the
>>> > point when the failure occurred till the point when the system catches
>>> > up to the last message that was processed before the outage?
>>> >
>>> > Thank you very much in advance!
>>> >
>>> > Regards,
>>> > Morgan.
>>>
>>>
>>
>

Re: Question: Determining Total Recovery Time

Posted by Arvid Heise <ar...@ververica.com>.
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure
that the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon
restart. Performance will take a hit but if that is significant depends on
the remaining pipeline.

Btw, at least once should be enough for that, since you implicitly
deduplicating.

Best,

Arvid

On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys <
morgan.geldenhuys@tu-berlin.de> wrote:

> Thanks for the advice, i will look into it.
>
> Had a quick think about another simple solution but we would need a hook
> into the checkpoint process from the task/operator perspective, which I
> haven't looked into yet. It would work like this:
>
> - The sink operators (?) would keep a local copy of the last message
> processed (or digest?), the current timestamp, and a boolean value
> indicating whether or not the system is in recovery or not.
> - While not in recovery, update the local copy and timestamp with each new
> event processed.
> - When a failure is detected and the taskmanagers are notified to
> rollback, we use the hook into this process to switch the boolean value to
> true.
> - While true, it compares each new message with the last one processed
> before the recovery process was initiated.
> - When a match is found, the difference between the previous and current
> timestamp is calculated and outputted as a custom metric and the boolean is
> reset to false.
>
> From here, the mean total recovery time could be calculated across the
> operators. Not sure how it would impact on performance, but i doubt it
> would be significant. We would need to ensure exactly once so that the
> message would be guaranteed to be seen again. thoughts?
>
> On 11.02.20 08:57, Arvid Heise wrote:
>
> Hi Morgan,
>
> as Timo pointed out, there is no general solution, but in your setting,
> you could look at the consumer lag of the input topic after a crash. Lag
> would spike until all tasks restarted and reprocessing begins. Offsets are
> only committed on checkpoints though by default.
>
> Best,
>
> Arvid
>
> On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Morgan,
>>
>> as far as I know this is not possible mostly because measuring "till the
>> point when the system catches up to the last message" is very
>> pipeline/connector dependent. Some sources might need to read from the
>> very beginning, some just continue from the latest checkpointed offset.
>>
>> Measure things like that (e.g. for experiments) might require collecting
>> own metrics as part of your pipeline definition.
>>
>> Regards,
>> Timo
>>
>>
>> On 03.02.20 12:20, Morgan Geldenhuys wrote:
>> > Community,
>> >
>> > I am interested in determining the total time to recover for a Flink
>> > application after experiencing a partial failure. Let's assume a
>> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
>> > guarantees enabled.
>> >
>> > Taking a look at the documentation
>> > (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>>
>> > one of the metrics which can be gathered is /recoveryTime/. However, as
>> > far as I can tell this is only the time taken for the system to go from
>> > an inconsistent state back into a consistent state, i.e. restarting the
>> > job. Is there any way of measuring the amount of time taken from the
>> > point when the failure occurred till the point when the system catches
>> > up to the last message that was processed before the outage?
>> >
>> > Thank you very much in advance!
>> >
>> > Regards,
>> > Morgan.
>>
>>
>

Re: Question: Determining Total Recovery Time

Posted by Morgan Geldenhuys <mo...@tu-berlin.de>.
Thanks for the advice, i will look into it.

Had a quick think about another simple solution but we would need a hook 
into the checkpoint process from the task/operator perspective, which I 
haven't looked into yet. It would work like this:

- The sink operators (?) would keep a local copy of the last message 
processed (or digest?), the current timestamp, and a boolean value 
indicating whether or not the system is in recovery or not.
- While not in recovery, update the local copy and timestamp with each 
new event processed.
- When a failure is detected and the taskmanagers are notified to 
rollback, we use the hook into this process to switch the boolean value 
to true.
- While true, it compares each new message with the last one processed 
before the recovery process was initiated.
- When a match is found, the difference between the previous and current 
timestamp is calculated and outputted as a custom metric and the boolean 
is reset to false.

 From here, the mean total recovery time could be calculated across the 
operators. Not sure how it would impact on performance, but i doubt it 
would be significant. We would need to ensure exactly once so that the 
message would be guaranteed to be seen again. thoughts?

On 11.02.20 08:57, Arvid Heise wrote:
> Hi Morgan,
>
> as Timo pointed out, there is no general solution, but in your 
> setting, you could look at the consumer lag of the input topic after a 
> crash. Lag would spike until all tasks restarted and reprocessing 
> begins. Offsets are only committed on checkpoints though by default.
>
> Best,
>
> Arvid
>
> On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Morgan,
>
>     as far as I know this is not possible mostly because measuring
>     "till the
>     point when the system catches up to the last message" is very
>     pipeline/connector dependent. Some sources might need to read from
>     the
>     very beginning, some just continue from the latest checkpointed
>     offset.
>
>     Measure things like that (e.g. for experiments) might require
>     collecting
>     own metrics as part of your pipeline definition.
>
>     Regards,
>     Timo
>
>
>     On 03.02.20 12:20, Morgan Geldenhuys wrote:
>     > Community,
>     >
>     > I am interested in determining the total time to recover for a
>     Flink
>     > application after experiencing a partial failure. Let's assume a
>     > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
>     > guarantees enabled.
>     >
>     > Taking a look at the documentation
>     >
>     (https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>
>     > one of the metrics which can be gathered is /recoveryTime/.
>     However, as
>     > far as I can tell this is only the time taken for the system to
>     go from
>     > an inconsistent state back into a consistent state, i.e.
>     restarting the
>     > job. Is there any way of measuring the amount of time taken from
>     the
>     > point when the failure occurred till the point when the system
>     catches
>     > up to the last message that was processed before the outage?
>     >
>     > Thank you very much in advance!
>     >
>     > Regards,
>     > Morgan.
>


Re: Question: Determining Total Recovery Time

Posted by Arvid Heise <ar...@ververica.com>.
Hi Morgan,

as Timo pointed out, there is no general solution, but in your setting, you
could look at the consumer lag of the input topic after a crash. Lag would
spike until all tasks restarted and reprocessing begins. Offsets are only
committed on checkpoints though by default.

Best,

Arvid

On Tue, Feb 4, 2020 at 12:32 PM Timo Walther <tw...@apache.org> wrote:

> Hi Morgan,
>
> as far as I know this is not possible mostly because measuring "till the
> point when the system catches up to the last message" is very
> pipeline/connector dependent. Some sources might need to read from the
> very beginning, some just continue from the latest checkpointed offset.
>
> Measure things like that (e.g. for experiments) might require collecting
> own metrics as part of your pipeline definition.
>
> Regards,
> Timo
>
>
> On 03.02.20 12:20, Morgan Geldenhuys wrote:
> > Community,
> >
> > I am interested in determining the total time to recover for a Flink
> > application after experiencing a partial failure. Let's assume a
> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
> > guarantees enabled.
> >
> > Taking a look at the documentation
> > (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>
> > one of the metrics which can be gathered is /recoveryTime/. However, as
> > far as I can tell this is only the time taken for the system to go from
> > an inconsistent state back into a consistent state, i.e. restarting the
> > job. Is there any way of measuring the amount of time taken from the
> > point when the failure occurred till the point when the system catches
> > up to the last message that was processed before the outage?
> >
> > Thank you very much in advance!
> >
> > Regards,
> > Morgan.
>
>

Re: Question: Determining Total Recovery Time

Posted by Timo Walther <tw...@apache.org>.
Hi Morgan,

as far as I know this is not possible mostly because measuring "till the 
point when the system catches up to the last message" is very 
pipeline/connector dependent. Some sources might need to read from the 
very beginning, some just continue from the latest checkpointed offset.

Measure things like that (e.g. for experiments) might require collecting 
own metrics as part of your pipeline definition.

Regards,
Timo


On 03.02.20 12:20, Morgan Geldenhuys wrote:
> Community,
> 
> I am interested in determining the total time to recover for a Flink 
> application after experiencing a partial failure. Let's assume a 
> pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once 
> guarantees enabled.
> 
> Taking a look at the documentation 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), 
> one of the metrics which can be gathered is /recoveryTime/. However, as 
> far as I can tell this is only the time taken for the system to go from 
> an inconsistent state back into a consistent state, i.e. restarting the 
> job. Is there any way of measuring the amount of time taken from the 
> point when the failure occurred till the point when the system catches 
> up to the last message that was processed before the outage?
> 
> Thank you very much in advance!
> 
> Regards,
> Morgan.