You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mason Chen <ma...@apple.com> on 2022/01/04 17:01:59 UTC

Re: unaligned checkpoint for job with large start delay

Hi Piotrek,

> In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there a way to confirm with metrics? We can add a window fire counter to the window operator that only gets incremented at the end of windows evaluation, in order to see the huge jumps in window fires. I can this benefiting other users who troubleshoot the problem of large number of window firing.

Best,
Mason

> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pn...@apache.org> wrote:
> 
> Hi Mason,
> 
> > and it has to finish processing this output before checkpoint can begin—is this right?
> 
> Yes. Checkpoint will be only executed once all triggered windows will be fully processed. 
> 
> But from what you have posted it looks like all of that delay is coming from hundreds of thousands of windows firing all at the same time. Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 70min = 151 200triggers fired at once (or in a very short interval). In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?
> 
> Best,
> Piotrek
> 
> 
> czw., 23 gru 2021 o 12:12 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
> Hi Piotr,
> 
> Thanks for the thorough response and the PR—will review later.
> 
> Clarifications:
> 1. The flat map you refer to produces at most 1 record.
> 2. The session window operator’s window process function emits at least 1 record. 
> 3. The 25 ms sleep is at the beginning of the window process function.
> 
> Your explanation about how records being bigger than the buffer size can cause blockage makes sense to me. However, my average record size is around 770 bytes coming out of the source and 960 bytes coming out of the window. Also, we don’t override the default `taskmanager.memory.segment-size`. My Flink job memory config is as follows:
> 
> ```
>         taskmanager.memory.jvm-metaspace.size: 512 mb
>         taskmanager.memory.jvm-overhead.max: 2Gb
>         taskmanager.memory.jvm-overhead.min: 512Mb
>         taskmanager.memory.managed.fraction: '0.4'
>         taskmanager.memory.network.fraction: '0.2'
>         taskmanager.memory.network.max: 2Gb
>         taskmanager.memory.network.min: 200Mb
>         taskmanager.memory.process.size: 16Gb
>         taskmanager.numberOfTaskSlots: '4'
> ```
> 
>>  Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?
> 
> To distinguish task blockage vs graceful backpressure, I have checked the operator throughput metrics and have confirmed that during window task buffer blockage, the window operator DOES emit records. Tasks look like they aren’t doing anything but the window is emitting records.
> 
> <throughput_metrics.png>
> 
> 
> Furthermore, I created a custom trigger to wrap a metric counter for FIRED counts to get a estimation of how many windows are fired at the same time. I ran a separate job with the same configs—the results look as follows:
> <trigger_metrics.png>
> 
> On average, when the buffers are blocked, there are 36 FIREs per second. Since each of these fires invokes the window process function, 25 ms * 36 = 900 ms means we sleep almost a second cumulatively, per second—which is pretty severe. Combined with the fact that the window process function can emit many records, the task takes even longer to checkpoint since the flatmap/kafka sink is chained with the window operator—and it has to finish processing this output before checkpoint can begin—is this right? In addition, when the window fires per second reduces, checkpoint is able to continue and succeed.
> 
> So, I think that the surge of window firing combined with the sleep is the source of the issue, which makes sense. I’m not sure how to confirm whether or not the points about buffer sizes being insufficient for the window output is also interplaying with this issue.
> 
> Best,
> Mason
> 
> 
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Mason,
>> 
>> One more question. Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?
>> 
>> Best,
>> Piotrek
>> 
>> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> napisał(a):
>> Hi,
>> 
>> Thanks for getting back to us. This is indeed weird.
>> 
>> >> One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
>> >
>> >This is true for aligned checkpoints too, right?
>> 
>> In a sense. For aligned checkpoints there is a stronger limitation, that the task has to process all of the buffered records on the input before it's able to do an aligned checkpoint. For unaligned checkpoints the task has to finish fully processing only the currently processed record.
>> 
>> > 1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?
>> 
>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are working in the task thread. But the task thread can not process records and do a checkpoint at the same time. FLIP-27 source will not pick up a next record from the input if there is a backpressure (that allows checkpoint to be triggered while task is back pressured), but this back pressure detection mechanism (or rather mechanism that prevents blocking waits of the task thread when there is a back pressure) is not perfect. A couple of the largests limitations are:
>> a) If your single record doesn't fit in a single network buffer, for example network buffer default size is 32KB and your record size can reach 33KB, back pressure detection will allow to process next record since there will be some buffer available, but the produced record won't fit into this single buffer and will have to blockingly wait for another buffer to be recycled (increasing start delay and/or alignment time).
>> b) If you have a flat map style operator/function in the chain, that multiplies the number of records you can hit exactly the same problem. For example, the network buffer is 32KB, record size is 330B, but you have a flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB so again you might end up with the task being blocked as a single buffer wouldn't be enough to serialize all of those 100 records. 
>> c) The same as b), but caused by a timer/watermark triggering WindowOperator to produce lots of records.
>> 
>> > 2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...
>> 
>> All of those points apply actually to every task, not only FLIP-27 source task and maybe they could explain why the window/flat map task has been blocked for ~2.5h. 
>> 
>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your record size is sometimes exceeding network buffer size, this can cause the source task to be blocked for those 6 hours. Source task will be simply stuck waiting for a buffer to be recycled, and this will only happen once a downstream task will process one more buffer. 
>> 
>> > 3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?
>> 
>> In the webUI the task nodes are colored depending on the busy/backpressured time. You can clearly see that the source is fully backpressured all the time, while the window is constantly busy. I presume your function that is inducing 25ms per record sleep time is chained with the window. That confirms for me that the window task is the bottleneck. However unfortunately there is no easy way to tell how severe this back pressure and for how long those tasks are blocked. In other words, a task that is busy processing records for 1ms every 1ns and a Task that is blocked busy processing a single record for 6h will both have the same 100% Busy metric. Same goes for blocked on the back pressure (both task back pressured for 1ms every 1ns and task back pressured 1h every 1ns will have 100% back pressure metric). Moreover there is currently no way to distinguish if a task is back pressured in a graceful way, without blocking the task thread, or if it is indeed blocking the task thread (due to a), b) or c)). I have created a ticket to add some metrics to help with that [1], but it won't help you right now.
>> 
>> I. You could do some estimations on paper if anything that I have written above can theoretically happen. You should know the size of the windows/record sizes/what your Flat Map functions are doing (it seems like you have two of those chained after the WindowOperator?). From the looks of it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window operators might suggest that it's possible.
>> II.  As those tasks are blocked for hours, triggering a checkpoint and collecting some stack traces can help you understand what the tasks are actually doing. But for that you would need to understand how to differentiate a blocked task, so...
>> III. ... maybe actually the most efficient way for us to help you would be if you could minimize/simplify your job, replace Kafka source with an artificial source that would be generating records, but in such a way that would still reproduce this behavior and share your code with us?
>> 
>> Best, Piotrek
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-25414 <https://issues.apache.org/jira/browse/FLINK-25414>
>> 
>> 
>> wt., 21 gru 2021 o 20:10 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
>> Hi Piotr,
>> 
>> These observations correspond to the 0ms alignment timeout setting.
>> 
>> The checkpoints are timeouting because the checkpoint acknowledgement is timing out. Now, we increased the timeout to 3 hours in our checkpoints and we still face errors due to checkpoint acknowledgement—the rest of the checkpoint config is still the same.
>> 
>> This is our job graph:
>> <job_graph.png>
>> To give more details about the window, we use the default event time trigger with a gap of 300 seconds and 180 allowed lateness. The window only implements the process function in which it emits 1 element.
>> 
>> Here are the screenshots of the failed checkpoints. Failures typically come in groups like this. On average, checkpoints complete in 2m 49s.
>> 
>> <failed_checkpoint_summary.png>
>> 
>> To show a few of the failed checkpoints in more detail:
>> 
>> For checkpoint 337, the source finishes checkpoint within a normal latency and the window checkpoint times out due to high start delay.
>> <checkpoint_337.png>
>> 
>> For checkpoint 338, we see very high start delay at the source and blocks the window operator from completing its checkpoint. I sorted by end to end duration for the subtasks to give an idea of the worst start delay. Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
>> <checkpoint_338.png>
>> 
>> 
>>> One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
>> 
>> This is true for aligned checkpoints too, right?
>> 
>> So my questions are:
>> 
>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?
>> 2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...
>> 3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?
>> 
>> Thanks,
>> Mason
>> 
>> 
>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>>> 
>>> Hi Mason,
>>> 
>>> Those checkpoint timeouts (30 minutes) have you already observed with the alignment timeout set to 0ms? Or as you were previously running it with 1s alignment timeout?
>>> 
>>> If the latter, it might be because unaligned checkpoints are failing to kick in in the first place. Setting the timeout to 0ms should solve the problem.
>>> 
>>> If the former, have you checked why the checkpoints are timeouting? What part of the checkpointing process is taking a long time? For example can you post a screenshot from the WebUI of checkpoint stats for each task? The only explanation I could think of is this sleep time that you added. 25ms per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record = 72 000 records. One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record. In your particular case, Flink will not be able to snapshot the state of the session window operator in the middle of the windows being fired. If your window operator is firing a lot of windows at the same time, or a single window is producing 72k of records (which would be an unusual but not unimaginable amount), this could block checkpointing of the window operator for 30 minutes due to this 25ms sleep down the stream.
>>> 
>>> Piotrek
>>> 
>>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
>>> Hi Piotr,
>>> 
>>> Thanks for the link to the JIRA ticket, we actually don’t see much state size overhead between checkpoints in aligned vs unaligned, so we will go with your recommendation of using unaligned checkpoints with 0s alignment timeout.
>>> 
>>> For context, we are testing unaligned checkpoints with our application with these tasks: [kafka source, map, filter] -> keyby -> [session window] -> [various kafka sinks]. The first task has parallelism 40 and the rest of the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>>> 
>>> We added an artificial sleep (25 ms per invocation of in process function) the session window task to simulate backpressure; however, we still see checkpoints failing due to task acknowledgement doesn’t complete within our checkpoint timeout (30 minutes).
>>> 
>>> I am able to correlate that the input buffers from window and output buffers from source being 100% usage corresponds to the checkpoint failures. When they are not full (input can drop to as low as 60% usage and output can drop to as low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, it is the session window task or source task failing to 100% acknowledge the barriers within timeout. I do see the source task acknowledgement taking long in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and not busy at this time.
>>> 
>>> All other input buffers are low usage (mostly 0). For output buffer, the usage is around 50% for window--everything else is near 0% all the time except the source mentioned before (makes sense since rest are just sinks).
>>> 
>>> We are also running a parallel Flink job with the same configurations, except with unaligned checkpoints disabled. Here we see observe the same behavior except now some of the checkpoints are failing due to the source task not acknowledging everything within timeout—however, most failures are still due to session window acknowledgement.
>>> 
>>> All the data seems to points an issue with the source? Now, I don’t know how to explain this behavior since unaligned checkpoints should overtake records in the buffers (once seen at the input buffer, forward immediately downstream to output buffer).
>>> 
>>> Just to confirm, this is our checkpoint configuration:
>>> ```
>>> Option
>>> Value
>>> Checkpointing Mode	Exactly Once
>>> Checkpoint Storage	FileSystemCheckpointStorage
>>> State Backend	EmbeddedRocksDBStateBackend
>>> Interval	5m 0s
>>> Timeout	30m 0s
>>> Minimum Pause Between Checkpoints	2m 0s
>>> Maximum Concurrent Checkpoints	1
>>> Unaligned Checkpoints	Enabled
>>> Persist Checkpoints Externally	Enabled (retain on cancellation)
>>> Tolerable Failed Checkpoints	10
>>> ```
>>> 
>>> Are there other metrics should I look at—why else should tasks fail acknowledgement in unaligned mode? Is it something about the implementation details of window function that I am not considering? My main hunch is something to do with the source.
>>> 
>>> Best,
>>> Mason
>>> 
>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>>>> 
>>>> Hi Mason,
>>>> 
>>>> In Flink 1.14 we have also changed the timeout behavior from checking against the alignment duration, to simply checking how old is the checkpoint barrier (so it would also account for the start delay) [1]. It was done in order to solve problems as you are describing. Unfortunately we can not backport this change to 1.13.x as it's a breaking change.
>>>> 
>>>> Anyway, from our experience I would recommend going all in with the unaligned checkpoints, so setting the timeout back to the default value of 0ms. With timeouts you are gaining very little (a tiny bit smaller state size if there is no backpressure - tiny bit because without backpressure, even with timeout set to 0ms, the amount of captured inflight data is basically insignificant), while in practise you slow down the checkpoint barriers propagation time by quite a lot.
>>>> 
>>>> Best,
>>>> Piotrek
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041 <https://issues.apache.org/jira/browse/FLINK-23041>
>>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6345@gmail.com <ma...@gmail.com>> napisał(a):
>>>> Hi all,
>>>> 
>>>> I'm using Flink 1.13 and my job is experiencing high start delay, more so than high alignment time. (our flip 27 kafka source is heavily backpressured). Since our alignment timeout is set to 1s, the unaligned checkpoint never triggers since alignment delay is always below the threshold.
>>>> 
>>>> It's seems there is only a configuration for alignment timeout but should there also be one for start delay timeout: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout>
>>>> 
>>>> I'm interested to know the reasoning why there isn't a timeout for start delay as well--was it because it was deemed too complex for the user to configure two parameters for unaligned checkpoints?
>>>> 
>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying to see how far unaligned checkpointing can take me.
>>>> 
>>>> Best,
>>>> Mason
>>> 
>> 
> 


Re: unaligned checkpoint for job with large start delay

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Thias and Mason,

> state-backend-rocksdb-metrics-estimate-num-keys

Indeed that can be a good indicator. However keep in mind that, depending
on your logic, there might be many existing windows for each key.

>  However, it’s not so clear how to count the windows that have been
registered since the window assigner does not expose the run time
context—is this even the right place to count?

Yes, I think you are unfortunately right. I've looked at the code, and it
wouldn't be even that easy to add such a metric. Sorry for misleading you.
But a spike in triggered windows is astrong indication that they were
triggered all at once.

> Perhaps, it can be an opt in feature? I do it see it being really useful
since most users aren’t really familiar with windows and these metrics can
help easily identify the common problem of too many windows firing.
> The additional metrics certainly help in diagnosing some of the symptoms
of the root problem.

I will think about how to solve it. I would be against an opt in metric, as
it would complicate code and configuration for the users while barely
anyone would use it.

Note that huge checkpoint start delay with unaligned checkpoints already
confirms that the system has been blocked by something. As I mentioned
before, there are a number of reasons why: record size larger than buffer
size, flatMap functions/operators multiplying number of records, large
number of timers fired at once. Summing up everything that you have
reported so far, we ruled out the former two options, and spike in the
number of triggered windows almost confirms that this is the issue at hand.

Best,
Piotrek

śr., 12 sty 2022 o 08:32 Schwalbe Matthias <Ma...@viseca.ch>
napisał(a):

> Hi Mason,
>
>
>
> Since you are using RocksDB, you could enable this metric [1]
> state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good
> indication of the number of active windows.
>
> I’ve never seen (despite the warning) negative effect on the runtime.
>
>
>
> Hope this help …
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys
>
>
>
> *From:* Mason Chen <ma...@apple.com>
> *Sent:* Dienstag, 11. Januar 2022 19:20
> *To:* Piotr Nowojski <pn...@apache.org>
> *Cc:* Mason Chen <ma...@gmail.com>; user <us...@flink.apache.org>
> *Subject:* Re: unaligned checkpoint for job with large start delay
>
>
>
> Hi Piotrek,
>
>
>
> No worries—I hope you had a good break.
>
>
>
> Counting how many windows have been registered/fired and plotting that
> over time.
>
> It’s straightforward to count windows that are fired (the trigger exposes
> the run time context and we can collect the information in that code path).
> However, it’s not so clear how to count the windows that have been
> registered since the window assigner does not expose the run time
> context—is this even the right place to count? It’s not necessarily the
> case that an assignment results in a new window registered. Am I missing
> anything else relevant from the user facing interface perspective?
>
>
>
>  Unfortunately at the moment I don't know how to implement such a metric
> without affecting performance on the critical path, so I don't see this
> happening soon :(
>
> Perhaps, it can be an opt in feature? I do it see it being really useful
> since most users aren’t really familiar with windows and these metrics can
> help easily identify the common problem of too many windows firing.
>
>
>
> The additional metrics certainly help in diagnosing some of the symptoms
> of the root problem.
>
>
>
> Best,
>
> Mason
>
>
>
> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
>
>
> Hi Mason,
>
>
>
> Sorry for a late reply, but I was OoO.
>
>
>
> I think you could confirm it with more custom metrics. Counting how many
> windows have been registered/fired and plotting that over time.
>
>
>
> I think it would be more helpful in this case to check how long a task has
> been blocked being "busy" processing for example timers. FLINK-25414 shows
> only blocked on being hard/soft backpressure. Unfortunately at the moment I
> don't know how to implement such a metric without affecting performance on
> the critical path, so I don't see this happening soon :(
>
>
>
> Best,
>
> Piotrek
>
>
>
> wt., 4 sty 2022 o 18:02 Mason Chen <ma...@apple.com> napisał(a):
>
> Hi Piotrek,
>
>
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
>
>
> Best,
>
> Mason
>
>
>
> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
>
>
> Hi Mason,
>
>
>
> > and it has to finish processing this output before checkpoint can
> begin—is this right?
>
>
>
> Yes. Checkpoint will be only executed once all triggered windows will be
> fully processed.
>
>
>
> But from what you have posted it looks like all of that delay is
> coming from hundreds of thousands of windows firing all at the same time.
> Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s
> * 60s/min * 70min = 151 200triggers fired at once (or in a very short
> interval). In other words, something (presumably a watermark) has fired
> more than 151 200 windows at once, which is taking ~1h 10minutes to process
> and during this time the checkpoint can not make any progress. Is this
> number of triggered windows plausible in your scenario?
>
>
>
> Best,
>
> Piotrek
>
>
>
>
>
> czw., 23 gru 2021 o 12:12 Mason Chen <ma...@apple.com> napisał(a):
>
> Hi Piotr,
>
>
>
> Thanks for the thorough response and the PR—will review later.
>
>
>
> Clarifications:
>
> 1. The flat map you refer to produces at most 1 record.
>
> 2. The session window operator’s *window process function* emits at least
> 1 record.
>
> 3. The 25 ms sleep is at the beginning of the window process function.
>
>
>
> Your explanation about how records being bigger than the buffer size can
> cause blockage makes sense to me. However, my average record size is
> around 770 bytes coming out of the source and 960 bytes coming out of the
> window. Also, we don’t override the default
> `taskmanager.memory.segment-size`. My Flink job memory config is as follows:
>
>
>
> ```
>
> taskmanager.memory.jvm-metaspace.size: 512 mb
>
> taskmanager.memory.jvm-overhead.max: 2Gb
>
> taskmanager.memory.jvm-overhead.min: 512Mb
>
> taskmanager.memory.managed.fraction: '0.4'
>
> taskmanager.memory.network.fraction: '0.2'
>
> taskmanager.memory.network.max: 2Gb
>
> taskmanager.memory.network.min: 200Mb
>
> taskmanager.memory.process.size: 16Gb
>
> taskmanager.numberOfTaskSlots: '4'
>
> ```
>
>
>
>  Are you sure your job is making any progress? Are records being
> processed? Hasn't your job simply deadlocked on something?
>
>
>
> To distinguish task blockage vs graceful backpressure, I have checked the
> operator throughput metrics and have confirmed that during window *task*
> buffer blockage, the window *operator* DOES emit records. Tasks look like
> they aren’t doing anything but the window is emitting records.
>
>
>
> <throughput_metrics.png>
>
>
>
>
>
> Furthermore, I created a custom trigger to wrap a metric counter for FIRED
> counts to get a estimation of how many windows are fired at the same time.
> I ran a separate job with the same configs—the results look as follows:
>
> <trigger_metrics.png>
>
>
>
> On average, when the buffers are blocked, there are 36 FIREs per second.
> Since each of these fires invokes the window process function, 25 ms * 36 =
> 900 ms means we sleep almost a second cumulatively, per second—which is
> pretty severe. Combined with the fact that the window process function can
> emit many records, the task takes even longer to checkpoint since the
> flatmap/kafka sink is chained with the window operator—and it has to finish
> processing this output before checkpoint can begin—*is this right?* In
> addition, when the window fires per second reduces, checkpoint is able to
> continue and succeed.
>
>
>
> So, I think that the surge of window firing combined with the sleep is the
> source of the issue, which makes sense. I’m not sure how to confirm whether
> or not the points about buffer sizes being insufficient for the window
> output is also interplaying with this issue.
>
>
>
> Best,
>
> Mason
>
>
>
>
>
> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
>
>
> Hi Mason,
>
>
>
> One more question. Are you sure your job is making any progress? Are
> records being processed? Hasn't your job simply deadlocked on something?
>
>
>
> Best,
>
> Piotrek
>
>
>
> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pn...@apache.org> napisał(a):
>
> Hi,
>
>
>
> Thanks for getting back to us. This is indeed weird.
>
>
>
> >> One of the unaligned checkpoints limitations is that Flink can not
> snapshot a state of an operator in the middle of processing a record.
> >
> >This is true for aligned checkpoints too, right?
>
> In a sense. For aligned checkpoints there is a stronger limitation, that
> the task has to process all of the buffered records on the input before
> it's able to do an aligned checkpoint. For unaligned checkpoints the task
> has to finish fully processing only the currently processed record.
>
>
>
> > 1. Why is there high start delay at the source? Isn’t this what FLIP 27
> sources are designed to overcome since the need to acquire the checkpoint
> lock is irrelevant? Is it a bug?
>
>
>
> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources
> are working in the task thread. But the task thread can not process records
> and do a checkpoint at the same time. FLIP-27 source will not pick up a
> next record from the input if there is a backpressure (that allows
> checkpoint to be triggered while task is back pressured), but this back
> pressure detection mechanism (or rather mechanism that prevents blocking
> waits of the task thread when there is a back pressure) is not perfect. A
> couple of the largests limitations are:
>
> a) If your single record doesn't fit in a single network buffer, for
> example network buffer default size is 32KB and your record size can reach
> 33KB, back pressure detection will allow to process next record since there
> will be some buffer available, but the produced record won't fit into this
> single buffer and will have to blockingly wait for another buffer to be
> recycled (increasing start delay and/or alignment time).
>
> b) If you have a flat map style operator/function in the chain, that
> multiplies the number of records you can hit exactly the same problem. For
> example, the network buffer is 32KB, record size is 330B, but you have a
> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB
> so again you might end up with the task being blocked as a single buffer
> wouldn't be enough to serialize all of those 100 records.
>
> c) The same as b), but caused by a timer/watermark triggering
> WindowOperator to produce lots of records.
>
>
>
> > 2. When the source operator finished for checkpoint 337, why is start
> delay high for the window? Barriers should have been forwarded downstream
> quite quickly unless the window operator is blocking for a few hours...
>
>
>
> All of those points apply actually to every task, not only FLIP-27 source
> task and maybe they could explain why the window/flat map task has been
> blocked for ~2.5h.
>
>
>
> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your
> record size is sometimes exceeding network buffer size, this can cause the
> source task to be blocked for those 6 hours. Source task will be simply
> stuck waiting for a buffer to be recycled, and this will only happen once a
> downstream task will process one more buffer.
>
>
>
> > 3. If the window is the bottleneck, what are the various ways to confirm
> this? We have metrics to measure the process function but we don’t know how
> many windows are getting fired at the same time to give the overall latency
> for the operator. Are there metrics or logs to see how many windows are
> getting fired or how long the window operator is blocking the window input
> buffers from processing?
>
>
>
> In the webUI the task nodes are colored depending on the
> busy/backpressured time. You can clearly see that the source is fully
> backpressured all the time, while the window is constantly busy. I presume
> your function that is inducing 25ms per record sleep time is chained with
> the window. That confirms for me that the window task is the bottleneck.
> However unfortunately there is no easy way to tell how severe this back
> pressure and for how long those tasks are blocked. In other words, a task
> that is busy processing records for 1ms every 1ns and a Task that is
> blocked busy processing a single record for 6h will both have the same 100%
> Busy metric. Same goes for blocked on the back pressure (both task back
> pressured for 1ms every 1ns and task back pressured 1h every 1ns will have
> 100% back pressure metric). Moreover there is currently no way to
> distinguish if a task is back pressured in a graceful way, without blocking
> the task thread, or if it is indeed blocking the task thread (due to a), b)
> or c)). I have created a ticket to add some metrics to help with that [1],
> but it won't help you right now.
>
>
>
> I. You could do some estimations on paper if anything that I have written
> above can theoretically happen. You should know the size of the
> windows/record sizes/what your Flat Map functions are doing (it seems like
> you have two of those chained after the WindowOperator?). From the looks of
> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window
> operators might suggest that it's possible.
>
> II.  As those tasks are blocked for hours, triggering a checkpoint and
> collecting some stack traces can help you understand what the tasks are
> actually doing. But for that you would need to understand how to
> differentiate a blocked task, so...
>
> *III. ... maybe actually the most efficient way for us to help you would
> be if you could minimize/simplify your job, replace Kafka source with an
> artificial source that would be generating records, but in such a way that
> would still reproduce this behavior and share your code with us?*
>
>
>
> Best, Piotrek
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-25414
>
>
>
>
>
> wt., 21 gru 2021 o 20:10 Mason Chen <ma...@apple.com> napisał(a):
>
> Hi Piotr,
>
>
>
> These observations correspond to the 0ms alignment timeout setting.
>
>
>
> The checkpoints are timeouting because the checkpoint acknowledgement is
> timing out. Now, we increased the timeout to 3 hours in our checkpoints and
> we still face errors due to checkpoint acknowledgement—the rest of the
> checkpoint config is still the same.
>
>
>
> This is our job graph:
>
> <job_graph.png>
>
> To give more details about the window, we use the default event time
> trigger with a gap of 300 seconds and 180 allowed lateness. The window only
> implements the process function in which it emits 1 element.
>
>
>
> Here are the screenshots of the failed checkpoints. Failures typically
> come in groups like this. On average, checkpoints complete in 2m 49s.
>
>
>
> <failed_checkpoint_summary.png>
>
>
>
> To show a few of the failed checkpoints in more detail:
>
>
>
> For checkpoint 337, the source finishes checkpoint within a normal latency
> and the window checkpoint times out due to high start delay.
>
> <checkpoint_337.png>
>
>
>
> For checkpoint 338, we see very high start delay at the source and blocks
> the window operator from completing its checkpoint. I sorted by end to end
> duration for the subtasks to give an idea of the worst start delay. Start
> delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
>
> <checkpoint_338.png>
>
>
>
>
>
> One of the unaligned checkpoints limitations is that Flink can not
> snapshot a state of an operator in the middle of processing a record.
>
> This is true for aligned checkpoints too, right?
>
>
>
> So my questions are:
>
>
>
> 1. Why is there high start delay at the source? Isn’t this what FLIP 27
> sources are designed to overcome since the need to acquire the checkpoint
> lock is irrelevant? Is it a bug?
>
> 2. When the source operator finished for checkpoint 337, why is start
> delay high for the window? Barriers should have been forwarded downstream
> quite quickly unless the window operator is blocking for a few hours...
>
> 3. If the window is the bottleneck, what are the various ways to confirm
> this? We have metrics to measure the process function but we don’t know how
> many windows are getting fired at the same time to give the overall latency
> for the operator. Are there metrics or logs to see how many windows are
> getting fired or how long the window operator is blocking the window input
> buffers from processing?
>
>
>
> Thanks,
>
> Mason
>
>
>
>
>
> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
>
>
> Hi Mason,
>
>
>
> Those checkpoint timeouts (30 minutes) have you already observed with the
> alignment timeout set to 0ms? Or as you were previously running it with 1s
> alignment timeout?
>
>
>
> If the latter, it might be because unaligned checkpoints are failing to
> kick in in the first place. Setting the timeout to 0ms should solve the
> problem.
>
>
>
> If the former, have you checked why the checkpoints are timeouting? What
> part of the checkpointing process is taking a long time? For example can
> you post a screenshot from the WebUI of checkpoint stats for each task? The
> only explanation I could think of is this sleep time that you added. 25ms
> per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record
> = 72 000 records. One of the unaligned checkpoints limitations is that
> Flink can not snapshot a state of an operator in the middle of processing a
> record. In your particular case, Flink will not be able to snapshot the
> state of the session window operator in the middle of the windows being
> fired. If your window operator is firing a lot of windows at the same time,
> or a single window is producing 72k of records (which would be an
> unusual but not unimaginable amount), this could block checkpointing of the
> window operator for 30 minutes due to this 25ms sleep down the stream.
>
>
>
> Piotrek
>
>
>
> pt., 17 gru 2021 o 19:19 Mason Chen <ma...@apple.com> napisał(a):
>
> Hi Piotr,
>
>
>
> Thanks for the link to the JIRA ticket, we actually don’t see much state
> size overhead between checkpoints in aligned vs unaligned, so we will go
> with your recommendation of using unaligned checkpoints with 0s alignment
> timeout.
>
>
>
> For context, we are testing unaligned checkpoints with our application
> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
> -> [various kafka sinks]. The first task has parallelism 40 and the rest of
> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>
>
>
> We added an artificial sleep (25 ms per invocation of in process function)
> the session window task to simulate backpressure; however, we still see
> checkpoints failing due to task acknowledgement doesn’t complete within our
> checkpoint timeout (30 minutes).
>
>
>
> I am able to correlate that the input buffers from *window* and output
> buffers from *source* being 100% usage corresponds to the checkpoint
> failures. When they are not full (input can drop to as low as 60% usage and
> output can drop to as low as 55% usage), the checkpoints succeed within
> less than 2 ms. In all cases, it is the session window task or source task
> failing to 100% acknowledge the barriers within timeout. I do see the
> *source* task acknowledgement taking long in some of the failures (e.g.
> 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and
> not busy at this time.
>
>
>
> All other input buffers are low usage (mostly 0). For output buffer, the
> usage is around 50% for window--everything else is near 0% all the time
> except the source mentioned before (makes sense since rest are just sinks).
>
>
>
> We are also running a parallel Flink job with the same configurations,
> except with unaligned checkpoints disabled. Here we see observe the same
> behavior except now some of the checkpoints are failing due to the source
> task not acknowledging everything within timeout—however, most failures are
> still due to session window acknowledgement.
>
>
>
> All the data seems to points an issue with the source? Now, I don’t know
> how to explain this behavior since unaligned checkpoints should overtake
> records in the buffers (once seen at the input buffer, forward immediately
> downstream to output buffer).
>
>
>
> Just to confirm, this is our checkpoint configuration:
>
> ```
>
> Option
>
> Value
>
> Checkpointing Mode
>
> Exactly Once
>
> Checkpoint Storage
>
> FileSystemCheckpointStorage
>
> State Backend
>
> EmbeddedRocksDBStateBackend
>
> Interval
>
> 5m 0s
>
> Timeout
>
> 30m 0s
>
> Minimum Pause Between Checkpoints
>
> 2m 0s
>
> Maximum Concurrent Checkpoints
>
> 1
>
> Unaligned Checkpoints
>
> Enabled
>
> Persist Checkpoints Externally
>
> Enabled (retain on cancellation)
>
> Tolerable Failed Checkpoints
>
> 10
>
> ```
>
>
>
> Are there other metrics should I look at—why else should tasks fail
> acknowledgement in unaligned mode? Is it something about the implementation
> details of window function that I am not considering? My main hunch is
> something to do with the source.
>
>
>
> Best,
>
> Mason
>
>
>
> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
>
>
> Hi Mason,
>
>
>
> In Flink 1.14 we have also changed the timeout behavior from checking
> against the alignment duration, to simply checking how old is the
> checkpoint barrier (so it would also account for the start delay) [1]. It
> was done in order to solve problems as you are describing. Unfortunately we
> can not backport this change to 1.13.x as it's a breaking change.
>
>
>
> Anyway, from our experience I would recommend going all in with the
> unaligned checkpoints, so setting the timeout back to the default value of
> 0ms. With timeouts you are gaining very little (a tiny bit smaller state
> size if there is no backpressure - tiny bit because without backpressure,
> even with timeout set to 0ms, the amount of captured inflight data is
> basically insignificant), while in practise you slow down the checkpoint
> barriers propagation time by quite a lot.
>
>
>
> Best,
>
> Piotrek
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-23041
>
>
>
> wt., 14 gru 2021 o 22:04 Mason Chen <ma...@gmail.com> napisał(a):
>
> Hi all,
>
>
>
> I'm using Flink 1.13 and my job is experiencing high start delay, more so
> than high alignment time. (our flip 27 kafka source is heavily
> backpressured). Since our alignment timeout is set to 1s, the unaligned
> checkpoint never triggers since alignment delay is always below the
> threshold.
>
>
>
> It's seems there is only a configuration for alignment timeout but should
> there also be one for start delay timeout:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>
>
>
> I'm interested to know the reasoning why there isn't a timeout for start
> delay as well--was it because it was deemed too complex for the user to
> configure two parameters for unaligned checkpoints?
>
> I'm aware of buffer debloating in 1.14 that could help but I'm trying to
> see how far unaligned checkpointing can take me.
>
>
>
> Best,
>
> Mason
>
>
>
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

RE: unaligned checkpoint for job with large start delay

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Mason,

Since you are using RocksDB, you could enable this metric [1] state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good indication of the number of active windows.
I’ve never seen (despite the warning) negative effect on the runtime.

Hope this help …

Thias




[1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys

From: Mason Chen <ma...@apple.com>
Sent: Dienstag, 11. Januar 2022 19:20
To: Piotr Nowojski <pn...@apache.org>
Cc: Mason Chen <ma...@gmail.com>; user <us...@flink.apache.org>
Subject: Re: unaligned checkpoint for job with large start delay

Hi Piotrek,

No worries—I hope you had a good break.

Counting how many windows have been registered/fired and plotting that over time.
It’s straightforward to count windows that are fired (the trigger exposes the run time context and we can collect the information in that code path). However, it’s not so clear how to count the windows that have been registered since the window assigner does not expose the run time context—is this even the right place to count? It’s not necessarily the case that an assignment results in a new window registered. Am I missing anything else relevant from the user facing interface perspective?

 Unfortunately at the moment I don't know how to implement such a metric without affecting performance on the critical path, so I don't see this happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since most users aren’t really familiar with windows and these metrics can help easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the root problem.

Best,
Mason


On Jan 10, 2022, at 1:00 AM, Piotr Nowojski <pn...@apache.org>> wrote:

Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has been blocked being "busy" processing for example timers. FLINK-25414 shows only blocked on being hard/soft backpressure. Unfortunately at the moment I don't know how to implement such a metric without affecting performance on the critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen <ma...@apple.com>> napisał(a):
Hi Piotrek,


In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there a way to confirm with metrics? We can add a window fire counter to the window operator that only gets incremented at the end of windows evaluation, in order to see the huge jumps in window fires. I can this benefiting other users who troubleshoot the problem of large number of window firing.

Best,
Mason


On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pn...@apache.org>> wrote:

Hi Mason,

> and it has to finish processing this output before checkpoint can begin—is this right?

Yes. Checkpoint will be only executed once all triggered windows will be fully processed.

But from what you have posted it looks like all of that delay is coming from hundreds of thousands of windows firing all at the same time. Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 70min = 151 200triggers fired at once (or in a very short interval). In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?

Best,
Piotrek


czw., 23 gru 2021 o 12:12 Mason Chen <ma...@apple.com>> napisał(a):
Hi Piotr,

Thanks for the thorough response and the PR—will review later.

Clarifications:
1. The flat map you refer to produces at most 1 record.
2. The session window operator’s window process function emits at least 1 record.
3. The 25 ms sleep is at the beginning of the window process function.

Your explanation about how records being bigger than the buffer size can cause blockage makes sense to me. However, my average record size is around 770 bytes coming out of the source and 960 bytes coming out of the window. Also, we don’t override the default `taskmanager.memory.segment-size`. My Flink job memory config is as follows:

```
taskmanager.memory.jvm-metaspace.size: 512 mb
taskmanager.memory.jvm-overhead.max: 2Gb
taskmanager.memory.jvm-overhead.min: 512Mb
taskmanager.memory.managed.fraction: '0.4'
taskmanager.memory.network.fraction: '0.2'
taskmanager.memory.network.max: 2Gb
taskmanager.memory.network.min: 200Mb
taskmanager.memory.process.size: 16Gb
taskmanager.numberOfTaskSlots: '4'
```

 Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?

To distinguish task blockage vs graceful backpressure, I have checked the operator throughput metrics and have confirmed that during window task buffer blockage, the window operator DOES emit records. Tasks look like they aren’t doing anything but the window is emitting records.

<throughput_metrics.png>


Furthermore, I created a custom trigger to wrap a metric counter for FIRED counts to get a estimation of how many windows are fired at the same time. I ran a separate job with the same configs—the results look as follows:
<trigger_metrics.png>

On average, when the buffers are blocked, there are 36 FIREs per second. Since each of these fires invokes the window process function, 25 ms * 36 = 900 ms means we sleep almost a second cumulatively, per second—which is pretty severe. Combined with the fact that the window process function can emit many records, the task takes even longer to checkpoint since the flatmap/kafka sink is chained with the window operator—and it has to finish processing this output before checkpoint can begin—is this right? In addition, when the window fires per second reduces, checkpoint is able to continue and succeed.

So, I think that the surge of window firing combined with the sleep is the source of the issue, which makes sense. I’m not sure how to confirm whether or not the points about buffer sizes being insufficient for the window output is also interplaying with this issue.

Best,
Mason



On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pn...@apache.org>> wrote:

Hi Mason,

One more question. Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?

Best,
Piotrek

śr., 22 gru 2021 o 10:02 Piotr Nowojski <pn...@apache.org>> napisał(a):
Hi,

Thanks for getting back to us. This is indeed weird.

>> One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
>
>This is true for aligned checkpoints too, right?

In a sense. For aligned checkpoints there is a stronger limitation, that the task has to process all of the buffered records on the input before it's able to do an aligned checkpoint. For unaligned checkpoints the task has to finish fully processing only the currently processed record.

> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?

Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are working in the task thread. But the task thread can not process records and do a checkpoint at the same time. FLIP-27 source will not pick up a next record from the input if there is a backpressure (that allows checkpoint to be triggered while task is back pressured), but this back pressure detection mechanism (or rather mechanism that prevents blocking waits of the task thread when there is a back pressure) is not perfect. A couple of the largests limitations are:
a) If your single record doesn't fit in a single network buffer, for example network buffer default size is 32KB and your record size can reach 33KB, back pressure detection will allow to process next record since there will be some buffer available, but the produced record won't fit into this single buffer and will have to blockingly wait for another buffer to be recycled (increasing start delay and/or alignment time).
b) If you have a flat map style operator/function in the chain, that multiplies the number of records you can hit exactly the same problem. For example, the network buffer is 32KB, record size is 330B, but you have a flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB so again you might end up with the task being blocked as a single buffer wouldn't be enough to serialize all of those 100 records.
c) The same as b), but caused by a timer/watermark triggering WindowOperator to produce lots of records.

> 2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...

All of those points apply actually to every task, not only FLIP-27 source task and maybe they could explain why the window/flat map task has been blocked for ~2.5h.

Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your record size is sometimes exceeding network buffer size, this can cause the source task to be blocked for those 6 hours. Source task will be simply stuck waiting for a buffer to be recycled, and this will only happen once a downstream task will process one more buffer.

> 3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?

In the webUI the task nodes are colored depending on the busy/backpressured time. You can clearly see that the source is fully backpressured all the time, while the window is constantly busy. I presume your function that is inducing 25ms per record sleep time is chained with the window. That confirms for me that the window task is the bottleneck. However unfortunately there is no easy way to tell how severe this back pressure and for how long those tasks are blocked. In other words, a task that is busy processing records for 1ms every 1ns and a Task that is blocked busy processing a single record for 6h will both have the same 100% Busy metric. Same goes for blocked on the back pressure (both task back pressured for 1ms every 1ns and task back pressured 1h every 1ns will have 100% back pressure metric). Moreover there is currently no way to distinguish if a task is back pressured in a graceful way, without blocking the task thread, or if it is indeed blocking the task thread (due to a), b) or c)). I have created a ticket to add some metrics to help with that [1], but it won't help you right now.

I. You could do some estimations on paper if anything that I have written above can theoretically happen. You should know the size of the windows/record sizes/what your Flat Map functions are doing (it seems like you have two of those chained after the WindowOperator?). From the looks of it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window operators might suggest that it's possible.
II.  As those tasks are blocked for hours, triggering a checkpoint and collecting some stack traces can help you understand what the tasks are actually doing. But for that you would need to understand how to differentiate a blocked task, so...
III. ... maybe actually the most efficient way for us to help you would be if you could minimize/simplify your job, replace Kafka source with an artificial source that would be generating records, but in such a way that would still reproduce this behavior and share your code with us?

Best, Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-25414


wt., 21 gru 2021 o 20:10 Mason Chen <ma...@apple.com>> napisał(a):
Hi Piotr,

These observations correspond to the 0ms alignment timeout setting.

The checkpoints are timeouting because the checkpoint acknowledgement is timing out. Now, we increased the timeout to 3 hours in our checkpoints and we still face errors due to checkpoint acknowledgement—the rest of the checkpoint config is still the same.

This is our job graph:
<job_graph.png>
To give more details about the window, we use the default event time trigger with a gap of 300 seconds and 180 allowed lateness. The window only implements the process function in which it emits 1 element.

Here are the screenshots of the failed checkpoints. Failures typically come in groups like this. On average, checkpoints complete in 2m 49s.

<failed_checkpoint_summary.png>

To show a few of the failed checkpoints in more detail:

For checkpoint 337, the source finishes checkpoint within a normal latency and the window checkpoint times out due to high start delay.
<checkpoint_337.png>

For checkpoint 338, we see very high start delay at the source and blocks the window operator from completing its checkpoint. I sorted by end to end duration for the subtasks to give an idea of the worst start delay. Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
<checkpoint_338.png>



One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
This is true for aligned checkpoints too, right?

So my questions are:

1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?
2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...
3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?

Thanks,
Mason



On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pn...@apache.org>> wrote:

Hi Mason,

Those checkpoint timeouts (30 minutes) have you already observed with the alignment timeout set to 0ms? Or as you were previously running it with 1s alignment timeout?

If the latter, it might be because unaligned checkpoints are failing to kick in in the first place. Setting the timeout to 0ms should solve the problem.

If the former, have you checked why the checkpoints are timeouting? What part of the checkpointing process is taking a long time? For example can you post a screenshot from the WebUI of checkpoint stats for each task? The only explanation I could think of is this sleep time that you added. 25ms per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record = 72 000 records. One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record. In your particular case, Flink will not be able to snapshot the state of the session window operator in the middle of the windows being fired. If your window operator is firing a lot of windows at the same time, or a single window is producing 72k of records (which would be an unusual but not unimaginable amount), this could block checkpointing of the window operator for 30 minutes due to this 25ms sleep down the stream.

Piotrek

pt., 17 gru 2021 o 19:19 Mason Chen <ma...@apple.com>> napisał(a):
Hi Piotr,

Thanks for the link to the JIRA ticket, we actually don’t see much state size overhead between checkpoints in aligned vs unaligned, so we will go with your recommendation of using unaligned checkpoints with 0s alignment timeout.

For context, we are testing unaligned checkpoints with our application with these tasks: [kafka source, map, filter] -> keyby -> [session window] -> [various kafka sinks]. The first task has parallelism 40 and the rest of the tasks have parallelism 240. This is the FLIP 27 Kafka source.

We added an artificial sleep (25 ms per invocation of in process function) the session window task to simulate backpressure; however, we still see checkpoints failing due to task acknowledgement doesn’t complete within our checkpoint timeout (30 minutes).

I am able to correlate that the input buffers from window and output buffers from source being 100% usage corresponds to the checkpoint failures. When they are not full (input can drop to as low as 60% usage and output can drop to as low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, it is the session window task or source task failing to 100% acknowledge the barriers within timeout. I do see the source task acknowledgement taking long in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and not busy at this time.

All other input buffers are low usage (mostly 0). For output buffer, the usage is around 50% for window--everything else is near 0% all the time except the source mentioned before (makes sense since rest are just sinks).

We are also running a parallel Flink job with the same configurations, except with unaligned checkpoints disabled. Here we see observe the same behavior except now some of the checkpoints are failing due to the source task not acknowledging everything within timeout—however, most failures are still due to session window acknowledgement.

All the data seems to points an issue with the source? Now, I don’t know how to explain this behavior since unaligned checkpoints should overtake records in the buffers (once seen at the input buffer, forward immediately downstream to output buffer).

Just to confirm, this is our checkpoint configuration:
```
Option

Value

Checkpointing Mode

Exactly Once

Checkpoint Storage

FileSystemCheckpointStorage

State Backend

EmbeddedRocksDBStateBackend

Interval

5m 0s

Timeout

30m 0s

Minimum Pause Between Checkpoints

2m 0s

Maximum Concurrent Checkpoints

1

Unaligned Checkpoints

Enabled

Persist Checkpoints Externally

Enabled (retain on cancellation)

Tolerable Failed Checkpoints

10

```

Are there other metrics should I look at—why else should tasks fail acknowledgement in unaligned mode? Is it something about the implementation details of window function that I am not considering? My main hunch is something to do with the source.

Best,
Mason


On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pn...@apache.org>> wrote:

Hi Mason,

In Flink 1.14 we have also changed the timeout behavior from checking against the alignment duration, to simply checking how old is the checkpoint barrier (so it would also account for the start delay) [1]. It was done in order to solve problems as you are describing. Unfortunately we can not backport this change to 1.13.x as it's a breaking change.

Anyway, from our experience I would recommend going all in with the unaligned checkpoints, so setting the timeout back to the default value of 0ms. With timeouts you are gaining very little (a tiny bit smaller state size if there is no backpressure - tiny bit because without backpressure, even with timeout set to 0ms, the amount of captured inflight data is basically insignificant), while in practise you slow down the checkpoint barriers propagation time by quite a lot.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-23041

wt., 14 gru 2021 o 22:04 Mason Chen <ma...@gmail.com>> napisał(a):
Hi all,

I'm using Flink 1.13 and my job is experiencing high start delay, more so than high alignment time. (our flip 27 kafka source is heavily backpressured). Since our alignment timeout is set to 1s, the unaligned checkpoint never triggers since alignment delay is always below the threshold.

It's seems there is only a configuration for alignment timeout but should there also be one for start delay timeout: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout

I'm interested to know the reasoning why there isn't a timeout for start delay as well--was it because it was deemed too complex for the user to configure two parameters for unaligned checkpoints?

I'm aware of buffer debloating in 1.14 that could help but I'm trying to see how far unaligned checkpointing can take me.

Best,
Mason





Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: unaligned checkpoint for job with large start delay

Posted by Mason Chen <ma...@apple.com>.
Hi Piotrek,

No worries—I hope you had a good break.

> Counting how many windows have been registered/fired and plotting that over time.
It’s straightforward to count windows that are fired (the trigger exposes the run time context and we can collect the information in that code path). However, it’s not so clear how to count the windows that have been registered since the window assigner does not expose the run time context—is this even the right place to count? It’s not necessarily the case that an assignment results in a new window registered. Am I missing anything else relevant from the user facing interface perspective?

>  Unfortunately at the moment I don't know how to implement such a metric without affecting performance on the critical path, so I don't see this happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since most users aren’t really familiar with windows and these metrics can help easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the root problem.

Best,
Mason

> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski <pn...@apache.org> wrote:
> 
> Hi Mason,
> 
> Sorry for a late reply, but I was OoO.
> 
> I think you could confirm it with more custom metrics. Counting how many windows have been registered/fired and plotting that over time.
> 
> I think it would be more helpful in this case to check how long a task has been blocked being "busy" processing for example timers. FLINK-25414 shows only blocked on being hard/soft backpressure. Unfortunately at the moment I don't know how to implement such a metric without affecting performance on the critical path, so I don't see this happening soon :(
> 
> Best,
> Piotrek
> 
> wt., 4 sty 2022 o 18:02 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
> Hi Piotrek,
> 
>> In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?
> 
> It seems plausible—there are potentially many keys (and many windows). Is there a way to confirm with metrics? We can add a window fire counter to the window operator that only gets incremented at the end of windows evaluation, in order to see the huge jumps in window fires. I can this benefiting other users who troubleshoot the problem of large number of window firing.
> 
> Best,
> Mason
> 
>> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Mason,
>> 
>> > and it has to finish processing this output before checkpoint can begin—is this right?
>> 
>> Yes. Checkpoint will be only executed once all triggered windows will be fully processed. 
>> 
>> But from what you have posted it looks like all of that delay is coming from hundreds of thousands of windows firing all at the same time. Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 70min = 151 200triggers fired at once (or in a very short interval). In other words, something (presumably a watermark) has fired more than 151 200 windows at once, which is taking ~1h 10minutes to process and during this time the checkpoint can not make any progress. Is this number of triggered windows plausible in your scenario?
>> 
>> Best,
>> Piotrek
>> 
>> 
>> czw., 23 gru 2021 o 12:12 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
>> Hi Piotr,
>> 
>> Thanks for the thorough response and the PR—will review later.
>> 
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s window process function emits at least 1 record. 
>> 3. The 25 ms sleep is at the beginning of the window process function.
>> 
>> Your explanation about how records being bigger than the buffer size can cause blockage makes sense to me. However, my average record size is around 770 bytes coming out of the source and 960 bytes coming out of the window. Also, we don’t override the default `taskmanager.memory.segment-size`. My Flink job memory config is as follows:
>> 
>> ```
>>         taskmanager.memory.jvm-metaspace.size: 512 mb
>>         taskmanager.memory.jvm-overhead.max: 2Gb
>>         taskmanager.memory.jvm-overhead.min: 512Mb
>>         taskmanager.memory.managed.fraction: '0.4'
>>         taskmanager.memory.network.fraction: '0.2'
>>         taskmanager.memory.network.max: 2Gb
>>         taskmanager.memory.network.min: 200Mb
>>         taskmanager.memory.process.size: 16Gb
>>         taskmanager.numberOfTaskSlots: '4'
>> ```
>> 
>>>  Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?
>> 
>> To distinguish task blockage vs graceful backpressure, I have checked the operator throughput metrics and have confirmed that during window task buffer blockage, the window operator DOES emit records. Tasks look like they aren’t doing anything but the window is emitting records.
>> 
>> <throughput_metrics.png>
>> 
>> 
>> Furthermore, I created a custom trigger to wrap a metric counter for FIRED counts to get a estimation of how many windows are fired at the same time. I ran a separate job with the same configs—the results look as follows:
>> <trigger_metrics.png>
>> 
>> On average, when the buffers are blocked, there are 36 FIREs per second. Since each of these fires invokes the window process function, 25 ms * 36 = 900 ms means we sleep almost a second cumulatively, per second—which is pretty severe. Combined with the fact that the window process function can emit many records, the task takes even longer to checkpoint since the flatmap/kafka sink is chained with the window operator—and it has to finish processing this output before checkpoint can begin—is this right? In addition, when the window fires per second reduces, checkpoint is able to continue and succeed.
>> 
>> So, I think that the surge of window firing combined with the sleep is the source of the issue, which makes sense. I’m not sure how to confirm whether or not the points about buffer sizes being insufficient for the window output is also interplaying with this issue.
>> 
>> Best,
>> Mason
>> 
>> 
>>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>>> 
>>> Hi Mason,
>>> 
>>> One more question. Are you sure your job is making any progress? Are records being processed? Hasn't your job simply deadlocked on something?
>>> 
>>> Best,
>>> Piotrek
>>> 
>>> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> napisał(a):
>>> Hi,
>>> 
>>> Thanks for getting back to us. This is indeed weird.
>>> 
>>> >> One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
>>> >
>>> >This is true for aligned checkpoints too, right?
>>> 
>>> In a sense. For aligned checkpoints there is a stronger limitation, that the task has to process all of the buffered records on the input before it's able to do an aligned checkpoint. For unaligned checkpoints the task has to finish fully processing only the currently processed record.
>>> 
>>> > 1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?
>>> 
>>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are working in the task thread. But the task thread can not process records and do a checkpoint at the same time. FLIP-27 source will not pick up a next record from the input if there is a backpressure (that allows checkpoint to be triggered while task is back pressured), but this back pressure detection mechanism (or rather mechanism that prevents blocking waits of the task thread when there is a back pressure) is not perfect. A couple of the largests limitations are:
>>> a) If your single record doesn't fit in a single network buffer, for example network buffer default size is 32KB and your record size can reach 33KB, back pressure detection will allow to process next record since there will be some buffer available, but the produced record won't fit into this single buffer and will have to blockingly wait for another buffer to be recycled (increasing start delay and/or alignment time).
>>> b) If you have a flat map style operator/function in the chain, that multiplies the number of records you can hit exactly the same problem. For example, the network buffer is 32KB, record size is 330B, but you have a flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB so again you might end up with the task being blocked as a single buffer wouldn't be enough to serialize all of those 100 records. 
>>> c) The same as b), but caused by a timer/watermark triggering WindowOperator to produce lots of records.
>>> 
>>> > 2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...
>>> 
>>> All of those points apply actually to every task, not only FLIP-27 source task and maybe they could explain why the window/flat map task has been blocked for ~2.5h. 
>>> 
>>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your record size is sometimes exceeding network buffer size, this can cause the source task to be blocked for those 6 hours. Source task will be simply stuck waiting for a buffer to be recycled, and this will only happen once a downstream task will process one more buffer. 
>>> 
>>> > 3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?
>>> 
>>> In the webUI the task nodes are colored depending on the busy/backpressured time. You can clearly see that the source is fully backpressured all the time, while the window is constantly busy. I presume your function that is inducing 25ms per record sleep time is chained with the window. That confirms for me that the window task is the bottleneck. However unfortunately there is no easy way to tell how severe this back pressure and for how long those tasks are blocked. In other words, a task that is busy processing records for 1ms every 1ns and a Task that is blocked busy processing a single record for 6h will both have the same 100% Busy metric. Same goes for blocked on the back pressure (both task back pressured for 1ms every 1ns and task back pressured 1h every 1ns will have 100% back pressure metric). Moreover there is currently no way to distinguish if a task is back pressured in a graceful way, without blocking the task thread, or if it is indeed blocking the task thread (due to a), b) or c)). I have created a ticket to add some metrics to help with that [1], but it won't help you right now.
>>> 
>>> I. You could do some estimations on paper if anything that I have written above can theoretically happen. You should know the size of the windows/record sizes/what your Flat Map functions are doing (it seems like you have two of those chained after the WindowOperator?). From the looks of it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window operators might suggest that it's possible.
>>> II.  As those tasks are blocked for hours, triggering a checkpoint and collecting some stack traces can help you understand what the tasks are actually doing. But for that you would need to understand how to differentiate a blocked task, so...
>>> III. ... maybe actually the most efficient way for us to help you would be if you could minimize/simplify your job, replace Kafka source with an artificial source that would be generating records, but in such a way that would still reproduce this behavior and share your code with us?
>>> 
>>> Best, Piotrek
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-25414 <https://issues.apache.org/jira/browse/FLINK-25414>
>>> 
>>> 
>>> wt., 21 gru 2021 o 20:10 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
>>> Hi Piotr,
>>> 
>>> These observations correspond to the 0ms alignment timeout setting.
>>> 
>>> The checkpoints are timeouting because the checkpoint acknowledgement is timing out. Now, we increased the timeout to 3 hours in our checkpoints and we still face errors due to checkpoint acknowledgement—the rest of the checkpoint config is still the same.
>>> 
>>> This is our job graph:
>>> <job_graph.png>
>>> To give more details about the window, we use the default event time trigger with a gap of 300 seconds and 180 allowed lateness. The window only implements the process function in which it emits 1 element.
>>> 
>>> Here are the screenshots of the failed checkpoints. Failures typically come in groups like this. On average, checkpoints complete in 2m 49s.
>>> 
>>> <failed_checkpoint_summary.png>
>>> 
>>> To show a few of the failed checkpoints in more detail:
>>> 
>>> For checkpoint 337, the source finishes checkpoint within a normal latency and the window checkpoint times out due to high start delay.
>>> <checkpoint_337.png>
>>> 
>>> For checkpoint 338, we see very high start delay at the source and blocks the window operator from completing its checkpoint. I sorted by end to end duration for the subtasks to give an idea of the worst start delay. Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
>>> <checkpoint_338.png>
>>> 
>>> 
>>>> One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record.
>>> 
>>> This is true for aligned checkpoints too, right?
>>> 
>>> So my questions are:
>>> 
>>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources are designed to overcome since the need to acquire the checkpoint lock is irrelevant? Is it a bug?
>>> 2. When the source operator finished for checkpoint 337, why is start delay high for the window? Barriers should have been forwarded downstream quite quickly unless the window operator is blocking for a few hours...
>>> 3. If the window is the bottleneck, what are the various ways to confirm this? We have metrics to measure the process function but we don’t know how many windows are getting fired at the same time to give the overall latency for the operator. Are there metrics or logs to see how many windows are getting fired or how long the window operator is blocking the window input buffers from processing?
>>> 
>>> Thanks,
>>> Mason
>>> 
>>> 
>>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>>>> 
>>>> Hi Mason,
>>>> 
>>>> Those checkpoint timeouts (30 minutes) have you already observed with the alignment timeout set to 0ms? Or as you were previously running it with 1s alignment timeout?
>>>> 
>>>> If the latter, it might be because unaligned checkpoints are failing to kick in in the first place. Setting the timeout to 0ms should solve the problem.
>>>> 
>>>> If the former, have you checked why the checkpoints are timeouting? What part of the checkpointing process is taking a long time? For example can you post a screenshot from the WebUI of checkpoint stats for each task? The only explanation I could think of is this sleep time that you added. 25ms per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record = 72 000 records. One of the unaligned checkpoints limitations is that Flink can not snapshot a state of an operator in the middle of processing a record. In your particular case, Flink will not be able to snapshot the state of the session window operator in the middle of the windows being fired. If your window operator is firing a lot of windows at the same time, or a single window is producing 72k of records (which would be an unusual but not unimaginable amount), this could block checkpointing of the window operator for 30 minutes due to this 25ms sleep down the stream.
>>>> 
>>>> Piotrek
>>>> 
>>>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.chen@apple.com <ma...@apple.com>> napisał(a):
>>>> Hi Piotr,
>>>> 
>>>> Thanks for the link to the JIRA ticket, we actually don’t see much state size overhead between checkpoints in aligned vs unaligned, so we will go with your recommendation of using unaligned checkpoints with 0s alignment timeout.
>>>> 
>>>> For context, we are testing unaligned checkpoints with our application with these tasks: [kafka source, map, filter] -> keyby -> [session window] -> [various kafka sinks]. The first task has parallelism 40 and the rest of the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>>>> 
>>>> We added an artificial sleep (25 ms per invocation of in process function) the session window task to simulate backpressure; however, we still see checkpoints failing due to task acknowledgement doesn’t complete within our checkpoint timeout (30 minutes).
>>>> 
>>>> I am able to correlate that the input buffers from window and output buffers from source being 100% usage corresponds to the checkpoint failures. When they are not full (input can drop to as low as 60% usage and output can drop to as low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, it is the session window task or source task failing to 100% acknowledge the barriers within timeout. I do see the source task acknowledgement taking long in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and not busy at this time.
>>>> 
>>>> All other input buffers are low usage (mostly 0). For output buffer, the usage is around 50% for window--everything else is near 0% all the time except the source mentioned before (makes sense since rest are just sinks).
>>>> 
>>>> We are also running a parallel Flink job with the same configurations, except with unaligned checkpoints disabled. Here we see observe the same behavior except now some of the checkpoints are failing due to the source task not acknowledging everything within timeout—however, most failures are still due to session window acknowledgement.
>>>> 
>>>> All the data seems to points an issue with the source? Now, I don’t know how to explain this behavior since unaligned checkpoints should overtake records in the buffers (once seen at the input buffer, forward immediately downstream to output buffer).
>>>> 
>>>> Just to confirm, this is our checkpoint configuration:
>>>> ```
>>>> Option
>>>> Value
>>>> Checkpointing Mode	Exactly Once
>>>> Checkpoint Storage	FileSystemCheckpointStorage
>>>> State Backend	EmbeddedRocksDBStateBackend
>>>> Interval	5m 0s
>>>> Timeout	30m 0s
>>>> Minimum Pause Between Checkpoints	2m 0s
>>>> Maximum Concurrent Checkpoints	1
>>>> Unaligned Checkpoints	Enabled
>>>> Persist Checkpoints Externally	Enabled (retain on cancellation)
>>>> Tolerable Failed Checkpoints	10
>>>> ```
>>>> 
>>>> Are there other metrics should I look at—why else should tasks fail acknowledgement in unaligned mode? Is it something about the implementation details of window function that I am not considering? My main hunch is something to do with the source.
>>>> 
>>>> Best,
>>>> Mason
>>>> 
>>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowojski@apache.org <ma...@apache.org>> wrote:
>>>>> 
>>>>> Hi Mason,
>>>>> 
>>>>> In Flink 1.14 we have also changed the timeout behavior from checking against the alignment duration, to simply checking how old is the checkpoint barrier (so it would also account for the start delay) [1]. It was done in order to solve problems as you are describing. Unfortunately we can not backport this change to 1.13.x as it's a breaking change.
>>>>> 
>>>>> Anyway, from our experience I would recommend going all in with the unaligned checkpoints, so setting the timeout back to the default value of 0ms. With timeouts you are gaining very little (a tiny bit smaller state size if there is no backpressure - tiny bit because without backpressure, even with timeout set to 0ms, the amount of captured inflight data is basically insignificant), while in practise you slow down the checkpoint barriers propagation time by quite a lot.
>>>>> 
>>>>> Best,
>>>>> Piotrek
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041 <https://issues.apache.org/jira/browse/FLINK-23041>
>>>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6345@gmail.com <ma...@gmail.com>> napisał(a):
>>>>> Hi all,
>>>>> 
>>>>> I'm using Flink 1.13 and my job is experiencing high start delay, more so than high alignment time. (our flip 27 kafka source is heavily backpressured). Since our alignment timeout is set to 1s, the unaligned checkpoint never triggers since alignment delay is always below the threshold.
>>>>> 
>>>>> It's seems there is only a configuration for alignment timeout but should there also be one for start delay timeout: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout>
>>>>> 
>>>>> I'm interested to know the reasoning why there isn't a timeout for start delay as well--was it because it was deemed too complex for the user to configure two parameters for unaligned checkpoints?
>>>>> 
>>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying to see how far unaligned checkpointing can take me.
>>>>> 
>>>>> Best,
>>>>> Mason
>>>> 
>>> 
>> 
> 


Re: unaligned checkpoint for job with large start delay

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has
been blocked being "busy" processing for example timers. FLINK-25414 shows
only blocked on being hard/soft backpressure. Unfortunately at the moment I
don't know how to implement such a metric without affecting performance on
the critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen <ma...@apple.com> napisał(a):

> Hi Piotrek,
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
> Best,
> Mason
>
> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pn...@apache.org> wrote:
>
> Hi Mason,
>
> > and it has to finish processing this output before checkpoint can
> begin—is this right?
>
> Yes. Checkpoint will be only executed once all triggered windows will be
> fully processed.
>
> But from what you have posted it looks like all of that delay is
> coming from hundreds of thousands of windows firing all at the same time.
> Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s
> * 60s/min * 70min = 151 200triggers fired at once (or in a very short
> interval). In other words, something (presumably a watermark) has fired
> more than 151 200 windows at once, which is taking ~1h 10minutes to process
> and during this time the checkpoint can not make any progress. Is this
> number of triggered windows plausible in your scenario?
>
> Best,
> Piotrek
>
>
> czw., 23 gru 2021 o 12:12 Mason Chen <ma...@apple.com> napisał(a):
>
>> Hi Piotr,
>>
>> Thanks for the thorough response and the PR—will review later.
>>
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s *window process function* emits at
>> least 1 record.
>> 3. The 25 ms sleep is at the beginning of the window process function.
>>
>> Your explanation about how records being bigger than the buffer size can
>> cause blockage makes sense to me. However, my average record size is around 770
>> bytes coming out of the source and 960 bytes coming out of the window.
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My
>> Flink job memory config is as follows:
>>
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>> taskmanager.memory.network.fraction: '0.2'
>> taskmanager.memory.network.max: 2Gb
>> taskmanager.memory.network.min: 200Mb
>> taskmanager.memory.process.size: 16Gb
>> taskmanager.numberOfTaskSlots: '4'
>> ```
>>
>>  Are you sure your job is making any progress? Are records being
>> processed? Hasn't your job simply deadlocked on something?
>>
>>
>> To distinguish task blockage vs graceful backpressure, I have checked the
>> operator throughput metrics and have confirmed that during window *task*
>> buffer blockage, the window *operator* DOES emit records. Tasks look
>> like they aren’t doing anything but the window is emitting records.
>>
>> <throughput_metrics.png>
>>
>>
>> Furthermore, I created a custom trigger to wrap a metric counter for
>> FIRED counts to get a estimation of how many windows are fired at the same
>> time. I ran a separate job with the same configs—the results look as
>> follows:
>> <trigger_metrics.png>
>>
>> On average, when the buffers are blocked, there are 36 FIREs per second.
>> Since each of these fires invokes the window process function, 25 ms * 36 =
>> 900 ms means we sleep almost a second cumulatively, per second—which is
>> pretty severe. Combined with the fact that the window process function can
>> emit many records, the task takes even longer to checkpoint since the
>> flatmap/kafka sink is chained with the window operator—and it has to finish
>> processing this output before checkpoint can begin—*is this right?* In
>> addition, when the window fires per second reduces, checkpoint is able to
>> continue and succeed.
>>
>> So, I think that the surge of window firing combined with the sleep is
>> the source of the issue, which makes sense. I’m not sure how to confirm
>> whether or not the points about buffer sizes being insufficient for the
>> window output is also interplaying with this issue.
>>
>> Best,
>> Mason
>>
>>
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pn...@apache.org> wrote:
>>
>> Hi Mason,
>>
>> One more question. Are you sure your job is making any progress? Are
>> records being processed? Hasn't your job simply deadlocked on something?
>>
>> Best,
>> Piotrek
>>
>> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pn...@apache.org>
>> napisał(a):
>>
>>> Hi,
>>>
>>> Thanks for getting back to us. This is indeed weird.
>>>
>>> >> One of the unaligned checkpoints limitations is that Flink can not
>>> snapshot a state of an operator in the middle of processing a record.
>>> >
>>> >This is true for aligned checkpoints too, right?
>>>
>>> In a sense. For aligned checkpoints there is a stronger limitation, that
>>> the task has to process all of the buffered records on the input before
>>> it's able to do an aligned checkpoint. For unaligned checkpoints the task
>>> has to finish fully processing only the currently processed record.
>>>
>>> > 1. Why is there high start delay at the source? Isn’t this what FLIP
>>> 27 sources are designed to overcome since the need to acquire the
>>> checkpoint lock is irrelevant? Is it a bug?
>>>
>>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources
>>> are working in the task thread. But the task thread can not process records
>>> and do a checkpoint at the same time. FLIP-27 source will not pick up a
>>> next record from the input if there is a backpressure (that allows
>>> checkpoint to be triggered while task is back pressured), but this back
>>> pressure detection mechanism (or rather mechanism that prevents blocking
>>> waits of the task thread when there is a back pressure) is not perfect. A
>>> couple of the largests limitations are:
>>> a) If your single record doesn't fit in a single network buffer, for
>>> example network buffer default size is 32KB and your record size can reach
>>> 33KB, back pressure detection will allow to process next record since there
>>> will be some buffer available, but the produced record won't fit into this
>>> single buffer and will have to blockingly wait for another buffer to be
>>> recycled (increasing start delay and/or alignment time).
>>> b) If you have a flat map style operator/function in the chain, that
>>> multiplies the number of records you can hit exactly the same problem. For
>>> example, the network buffer is 32KB, record size is 330B, but you have a
>>> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB
>>> so again you might end up with the task being blocked as a single buffer
>>> wouldn't be enough to serialize all of those 100 records.
>>> c) The same as b), but caused by a timer/watermark triggering
>>> WindowOperator to produce lots of records.
>>>
>>> > 2. When the source operator finished for checkpoint 337, why is start
>>> delay high for the window? Barriers should have been forwarded downstream
>>> quite quickly unless the window operator is blocking for a few hours...
>>>
>>> All of those points apply actually to every task, not only FLIP-27
>>> source task and maybe they could explain why the window/flat map task has
>>> been blocked for ~2.5h.
>>>
>>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your
>>> record size is sometimes exceeding network buffer size, this can cause the
>>> source task to be blocked for those 6 hours. Source task will be simply
>>> stuck waiting for a buffer to be recycled, and this will only happen once a
>>> downstream task will process one more buffer.
>>>
>>> > 3. If the window is the bottleneck, what are the various ways to
>>> confirm this? We have metrics to measure the process function but we don’t
>>> know how many windows are getting fired at the same time to give the
>>> overall latency for the operator. Are there metrics or logs to see how many
>>> windows are getting fired or how long the window operator is blocking the
>>> window input buffers from processing?
>>>
>>> In the webUI the task nodes are colored depending on the
>>> busy/backpressured time. You can clearly see that the source is fully
>>> backpressured all the time, while the window is constantly busy. I presume
>>> your function that is inducing 25ms per record sleep time is chained with
>>> the window. That confirms for me that the window task is the bottleneck.
>>> However unfortunately there is no easy way to tell how severe this back
>>> pressure and for how long those tasks are blocked. In other words, a task
>>> that is busy processing records for 1ms every 1ns and a Task that is
>>> blocked busy processing a single record for 6h will both have the same 100%
>>> Busy metric. Same goes for blocked on the back pressure (both task back
>>> pressured for 1ms every 1ns and task back pressured 1h every 1ns will have
>>> 100% back pressure metric). Moreover there is currently no way to
>>> distinguish if a task is back pressured in a graceful way, without blocking
>>> the task thread, or if it is indeed blocking the task thread (due to a), b)
>>> or c)). I have created a ticket to add some metrics to help with that [1],
>>> but it won't help you right now.
>>>
>>> I. You could do some estimations on paper if anything that I have
>>> written above can theoretically happen. You should know the size of the
>>> windows/record sizes/what your Flat Map functions are doing (it seems like
>>> you have two of those chained after the WindowOperator?). From the looks of
>>> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window
>>> operators might suggest that it's possible.
>>> II.  As those tasks are blocked for hours, triggering a checkpoint and
>>> collecting some stack traces can help you understand what the tasks are
>>> actually doing. But for that you would need to understand how to
>>> differentiate a blocked task, so...
>>> *III. ... maybe actually the most efficient way for us to help you would
>>> be if you could minimize/simplify your job, replace Kafka source with an
>>> artificial source that would be generating records, but in such a way that
>>> would still reproduce this behavior and share your code with us?*
>>>
>>> Best, Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-25414
>>>
>>>
>>> wt., 21 gru 2021 o 20:10 Mason Chen <ma...@apple.com> napisał(a):
>>>
>>>> Hi Piotr,
>>>>
>>>> These observations correspond to the 0ms alignment timeout setting.
>>>>
>>>> The checkpoints are timeouting because the checkpoint acknowledgement
>>>> is timing out. Now, we increased the timeout to 3 hours in our checkpoints
>>>> and we still face errors due to checkpoint acknowledgement—the rest of the
>>>> checkpoint config is still the same.
>>>>
>>>> This is our job graph:
>>>> <job_graph.png>
>>>> To give more details about the window, we use the default event time
>>>> trigger with a gap of 300 seconds and 180 allowed lateness. The window only
>>>> implements the process function in which it emits 1 element.
>>>>
>>>> Here are the screenshots of the failed checkpoints. Failures typically
>>>> come in groups like this. On average, checkpoints complete in 2m 49s.
>>>>
>>>> <failed_checkpoint_summary.png>
>>>>
>>>> To show a few of the failed checkpoints in more detail:
>>>>
>>>> For checkpoint 337, the source finishes checkpoint within a normal
>>>> latency and the window checkpoint times out due to high start delay.
>>>> <checkpoint_337.png>
>>>>
>>>> For checkpoint 338, we see very high start delay at the source and
>>>> blocks the window operator from completing its checkpoint. I sorted by end
>>>> to end duration for the subtasks to give an idea of the worst start delay.
>>>> Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6
>>>> hours).
>>>> <checkpoint_338.png>
>>>>
>>>>
>>>> One of the unaligned checkpoints limitations is that Flink can not
>>>> snapshot a state of an operator in the middle of processing a record.
>>>>
>>>> This is true for aligned checkpoints too, right?
>>>>
>>>> So my questions are:
>>>>
>>>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27
>>>> sources are designed to overcome since the need to acquire the checkpoint
>>>> lock is irrelevant? Is it a bug?
>>>> 2. When the source operator finished for checkpoint 337, why is start
>>>> delay high for the window? Barriers should have been forwarded downstream
>>>> quite quickly unless the window operator is blocking for a few hours...
>>>> 3. If the window is the bottleneck, what are the various ways to
>>>> confirm this? We have metrics to measure the process function but we don’t
>>>> know how many windows are getting fired at the same time to give the
>>>> overall latency for the operator. Are there metrics or logs to see how many
>>>> windows are getting fired or how long the window operator is blocking the
>>>> window input buffers from processing?
>>>>
>>>> Thanks,
>>>> Mason
>>>>
>>>>
>>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pn...@apache.org>
>>>> wrote:
>>>>
>>>> Hi Mason,
>>>>
>>>> Those checkpoint timeouts (30 minutes) have you already observed with
>>>> the alignment timeout set to 0ms? Or as you were previously running it with
>>>> 1s alignment timeout?
>>>>
>>>> If the latter, it might be because unaligned checkpoints are failing to
>>>> kick in in the first place. Setting the timeout to 0ms should solve the
>>>> problem.
>>>>
>>>> If the former, have you checked why the checkpoints are timeouting?
>>>> What part of the checkpointing process is taking a long time? For example
>>>> can you post a screenshot from the WebUI of checkpoint stats for each task?
>>>> The only explanation I could think of is this sleep time that you added.
>>>> 25ms per record is really a lot. I mean really a lot. 30 minutes / 25
>>>> ms/record = 72 000 records. One of the unaligned checkpoints limitations is
>>>> that Flink can not snapshot a state of an operator in the middle of
>>>> processing a record. In your particular case, Flink will not be able to
>>>> snapshot the state of the session window operator in the middle of the
>>>> windows being fired. If your window operator is firing a lot of windows at
>>>> the same time, or a single window is producing 72k of records (which would
>>>> be an unusual but not unimaginable amount), this could block checkpointing
>>>> of the window operator for 30 minutes due to this 25ms sleep down the
>>>> stream.
>>>>
>>>> Piotrek
>>>>
>>>> pt., 17 gru 2021 o 19:19 Mason Chen <ma...@apple.com> napisał(a):
>>>>
>>>>> Hi Piotr,
>>>>>
>>>>> Thanks for the link to the JIRA ticket, we actually don’t see much
>>>>> state size overhead between checkpoints in aligned vs unaligned, so we will
>>>>> go with your recommendation of using unaligned checkpoints with 0s
>>>>> alignment timeout.
>>>>>
>>>>> For context, we are testing unaligned checkpoints with our application
>>>>> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
>>>>> -> [various kafka sinks]. The first task has parallelism 40 and the rest of
>>>>> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>>>>>
>>>>> We added an artificial sleep (25 ms per invocation of in process
>>>>> function) the session window task to simulate backpressure; however, we
>>>>> still see checkpoints failing due to task acknowledgement doesn’t complete
>>>>> within our checkpoint timeout (30 minutes).
>>>>>
>>>>> I am able to correlate that the input buffers from *window* and
>>>>> output buffers from *source* being 100% usage corresponds to the
>>>>> checkpoint failures. When they are not full (input can drop to as low as
>>>>> 60% usage and output can drop to as low as 55% usage), the checkpoints
>>>>> succeed within less than 2 ms. In all cases, it is the session window task
>>>>> or source task failing to 100% acknowledge the barriers within timeout. I
>>>>> do see the *source* task acknowledgement taking long in some of the
>>>>> failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and
>>>>> source is idle and not busy at this time.
>>>>>
>>>>> All other input buffers are low usage (mostly 0). For output buffer,
>>>>> the usage is around 50% for window--everything else is near 0% all the time
>>>>> except the source mentioned before (makes sense since rest are just sinks).
>>>>>
>>>>> We are also running a parallel Flink job with the same configurations,
>>>>> except with unaligned checkpoints disabled. Here we see observe the same
>>>>> behavior except now some of the checkpoints are failing due to the source
>>>>> task not acknowledging everything within timeout—however, most failures are
>>>>> still due to session window acknowledgement.
>>>>>
>>>>> All the data seems to points an issue with the source? Now, I don’t
>>>>> know how to explain this behavior since unaligned checkpoints should
>>>>> overtake records in the buffers (once seen at the input buffer, forward
>>>>> immediately downstream to output buffer).
>>>>>
>>>>> Just to confirm, this is our checkpoint configuration:
>>>>> ```
>>>>> Option
>>>>> Value
>>>>> Checkpointing Mode Exactly Once
>>>>> Checkpoint Storage FileSystemCheckpointStorage
>>>>> State Backend EmbeddedRocksDBStateBackend
>>>>> Interval 5m 0s
>>>>> Timeout 30m 0s
>>>>> Minimum Pause Between Checkpoints 2m 0s
>>>>> Maximum Concurrent Checkpoints 1
>>>>> Unaligned Checkpoints Enabled
>>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>> Tolerable Failed Checkpoints 10
>>>>> ```
>>>>>
>>>>> Are there other metrics should I look at—why else should tasks fail
>>>>> acknowledgement in unaligned mode? Is it something about the implementation
>>>>> details of window function that I am not considering? My main hunch is
>>>>> something to do with the source.
>>>>>
>>>>> Best,
>>>>> Mason
>>>>>
>>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pn...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Hi Mason,
>>>>>
>>>>> In Flink 1.14 we have also changed the timeout behavior from checking
>>>>> against the alignment duration, to simply checking how old is the
>>>>> checkpoint barrier (so it would also account for the start delay) [1]. It
>>>>> was done in order to solve problems as you are describing. Unfortunately we
>>>>> can not backport this change to 1.13.x as it's a breaking change.
>>>>>
>>>>> Anyway, from our experience I would recommend going all in with the
>>>>> unaligned checkpoints, so setting the timeout back to the default value of
>>>>> 0ms. With timeouts you are gaining very little (a tiny bit smaller state
>>>>> size if there is no backpressure - tiny bit because without backpressure,
>>>>> even with timeout set to 0ms, the amount of captured inflight data is
>>>>> basically insignificant), while in practise you slow down the checkpoint
>>>>> barriers propagation time by quite a lot.
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041
>>>>>
>>>>> wt., 14 gru 2021 o 22:04 Mason Chen <ma...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm using Flink 1.13 and my job is experiencing high start delay,
>>>>>> more so than high alignment time. (our flip 27 kafka source is heavily
>>>>>> backpressured). Since our alignment timeout is set to 1s, the unaligned
>>>>>> checkpoint never triggers since alignment delay is always below the
>>>>>> threshold.
>>>>>>
>>>>>> It's seems there is only a configuration for alignment timeout but
>>>>>> should there also be one for start delay timeout:
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>>>>>>
>>>>>> I'm interested to know the reasoning why there isn't a timeout for
>>>>>> start delay as well--was it because it was deemed too complex for the user
>>>>>> to configure two parameters for unaligned checkpoints?
>>>>>>
>>>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying
>>>>>> to see how far unaligned checkpointing can take me.
>>>>>>
>>>>>> Best,
>>>>>> Mason
>>>>>>
>>>>>
>>>>>
>>>>
>>
>