You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hubert Chen <hu...@gmail.com> on 2020/08/26 16:06:11 UTC

Failures due to inevitable high backpressure

Hello,

My Flink application has entered into a bad state and I was wondering if I
could get some advice on how to resolve the issue.

The sequence of events that led to a bad state:

1. A failure occurs (e.g., TM timeout) within the cluster
2. The application successfully recovers from the last completed checkpoint
3. The application consumes events from Kafka as quickly as it can. This
introduces high backpressure.
4. A checkpoint is triggered
5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
transaction timeout) and the application loops back to step #2. This
creates a vicious cycle where no progress is made.

I believe the underlying issue is the application experiencing high
backpressure. This can cause the TM to not respond to heartbeats or cause
long checkpoint durations due to delayed processing of the checkpoint.

I'm confused on the best next steps to take. How do I ensure that
heartbeats and checkpoints successfully complete when experiencing
inevitable high packpressure?

Best,
Hubert

Re: Failures due to inevitable high backpressure

Posted by Arvid Heise <ar...@ververica.com>.
Hi Hubert,

The most straight-forward reason for backpressure is under-provisioning of
the cluster. An application over time usually needs gradually more
resources. If the user base of your company grows, so does the amount of
messages (be it click stream, page impressions, or any kind of
transactions). Often time, also the operator state grows. Sometimes, it's
just that the events themselves become more complex and thus you need more
overall bandwidth. This means that from time to time, you need to increase
the memory of Flink (for state) or the number of compute nodes (to handle
more events). In the same way, you need to make sure that your sink scales
as well.

If you fail to keep up with the demand, the application gradually becomes
more unstable and you see the vicious cycle at some point, where the system
does not even catch up in off-hours where the number of events becomes
small.

First, it's important to understand what the bottleneck is. Web UI should
help to narrow it down quickly.
Second, if TM becomes unresponsive, chances are high that memory ran out
(on or off-heap). So it might be enough to increase memory. In any case,
I'd expect one of the TM logs to show an exception. You could also profile
GC time of the TMs.
Third, you also might want to check your state size. If it grows over time,
it might also be some kind of leak (also logic errors are common, where too
much is held in state and never evicted).
Fourth, closely monitor how the application behaves while recovery. Is it
making progress at all or stalling at the same point?
Fifth, it might be worthwhile to add a computation node to the cluster,
just so that everything runs again and then remove it. If you now have 2
days of data in need of reprocessing to catch up, even the aforementioned
tweaks may not be enough.

Best,

Arvid

On Wed, Aug 26, 2020 at 10:01 PM David Anderson <da...@alpinegizmo.com>
wrote:

> One other thought: some users experiencing this have found it preferable
> to increase the checkpoint timeout to the point where it is effectively
> infinite. Checkpoints that can't timeout are likely to eventually complete,
> which is better than landing in the vicious cycle you described.
>
> David
>
> On Wed, Aug 26, 2020 at 7:41 PM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> You should begin by trying to identify the cause of the backpressure,
>> because the appropriate fix depends on the details.
>>
>> Possible causes that I have seen include:
>>
>> - the job is inadequately provisioned
>> - blocking i/o is being done in a user function
>> - a huge number of timers are firing simultaneously
>> - event time skew between different sources is causing large amounts of
>> state to be buffered
>> - data skew (a hot key) is overwhelming one subtask or slot
>> - external systems can't keep up (e.g., a sink)
>> - lengthy GC pauses caused by running lots of slots per TM with the
>> FsStateBackend
>> - contention for critical resources (e.g., using a NAS as the local disk
>> for RocksDB)
>>
>> Unaligned checkpoints [1], new in Flink 1.11, should address this problem
>> in some cases, depending on the root cause. But first you should try to
>> figure out why you have high backpressure, because a number of the causes
>> listed above won't be helped by changing to unaligned checkpoints.
>>
>> Best,
>> David
>>
>> [1]
>> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta
>>
>> On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <hu...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> My Flink application has entered into a bad state and I was wondering if
>>> I could get some advice on how to resolve the issue.
>>>
>>> The sequence of events that led to a bad state:
>>>
>>> 1. A failure occurs (e.g., TM timeout) within the cluster
>>> 2. The application successfully recovers from the last completed
>>> checkpoint
>>> 3. The application consumes events from Kafka as quickly as it can. This
>>> introduces high backpressure.
>>> 4. A checkpoint is triggered
>>> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
>>> transaction timeout) and the application loops back to step #2. This
>>> creates a vicious cycle where no progress is made.
>>>
>>> I believe the underlying issue is the application experiencing high
>>> backpressure. This can cause the TM to not respond to heartbeats or cause
>>> long checkpoint durations due to delayed processing of the checkpoint.
>>>
>>> I'm confused on the best next steps to take. How do I ensure that
>>> heartbeats and checkpoints successfully complete when experiencing
>>> inevitable high packpressure?
>>>
>>> Best,
>>> Hubert
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Failures due to inevitable high backpressure

Posted by David Anderson <da...@alpinegizmo.com>.
One other thought: some users experiencing this have found it preferable to
increase the checkpoint timeout to the point where it is effectively
infinite. Checkpoints that can't timeout are likely to eventually complete,
which is better than landing in the vicious cycle you described.

David

On Wed, Aug 26, 2020 at 7:41 PM David Anderson <da...@alpinegizmo.com>
wrote:

> You should begin by trying to identify the cause of the backpressure,
> because the appropriate fix depends on the details.
>
> Possible causes that I have seen include:
>
> - the job is inadequately provisioned
> - blocking i/o is being done in a user function
> - a huge number of timers are firing simultaneously
> - event time skew between different sources is causing large amounts of
> state to be buffered
> - data skew (a hot key) is overwhelming one subtask or slot
> - external systems can't keep up (e.g., a sink)
> - lengthy GC pauses caused by running lots of slots per TM with the
> FsStateBackend
> - contention for critical resources (e.g., using a NAS as the local disk
> for RocksDB)
>
> Unaligned checkpoints [1], new in Flink 1.11, should address this problem
> in some cases, depending on the root cause. But first you should try to
> figure out why you have high backpressure, because a number of the causes
> listed above won't be helped by changing to unaligned checkpoints.
>
> Best,
> David
>
> [1]
> https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta
>
> On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <hu...@gmail.com>
> wrote:
>
>> Hello,
>>
>> My Flink application has entered into a bad state and I was wondering if
>> I could get some advice on how to resolve the issue.
>>
>> The sequence of events that led to a bad state:
>>
>> 1. A failure occurs (e.g., TM timeout) within the cluster
>> 2. The application successfully recovers from the last completed
>> checkpoint
>> 3. The application consumes events from Kafka as quickly as it can. This
>> introduces high backpressure.
>> 4. A checkpoint is triggered
>> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
>> transaction timeout) and the application loops back to step #2. This
>> creates a vicious cycle where no progress is made.
>>
>> I believe the underlying issue is the application experiencing high
>> backpressure. This can cause the TM to not respond to heartbeats or cause
>> long checkpoint durations due to delayed processing of the checkpoint.
>>
>> I'm confused on the best next steps to take. How do I ensure that
>> heartbeats and checkpoints successfully complete when experiencing
>> inevitable high packpressure?
>>
>> Best,
>> Hubert
>>
>

Re: Failures due to inevitable high backpressure

Posted by David Anderson <da...@alpinegizmo.com>.
You should begin by trying to identify the cause of the backpressure,
because the appropriate fix depends on the details.

Possible causes that I have seen include:

- the job is inadequately provisioned
- blocking i/o is being done in a user function
- a huge number of timers are firing simultaneously
- event time skew between different sources is causing large amounts of
state to be buffered
- data skew (a hot key) is overwhelming one subtask or slot
- external systems can't keep up (e.g., a sink)
- lengthy GC pauses caused by running lots of slots per TM with the
FsStateBackend
- contention for critical resources (e.g., using a NAS as the local disk
for RocksDB)

Unaligned checkpoints [1], new in Flink 1.11, should address this problem
in some cases, depending on the root cause. But first you should try to
figure out why you have high backpressure, because a number of the causes
listed above won't be helped by changing to unaligned checkpoints.

Best,
David

[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#unaligned-checkpoints-beta

On Wed, Aug 26, 2020 at 6:06 PM Hubert Chen <hu...@gmail.com> wrote:

> Hello,
>
> My Flink application has entered into a bad state and I was wondering if I
> could get some advice on how to resolve the issue.
>
> The sequence of events that led to a bad state:
>
> 1. A failure occurs (e.g., TM timeout) within the cluster
> 2. The application successfully recovers from the last completed checkpoint
> 3. The application consumes events from Kafka as quickly as it can. This
> introduces high backpressure.
> 4. A checkpoint is triggered
> 5. Another failure occurs (e.g., TM timeout, checkpoint timeout, Kafka
> transaction timeout) and the application loops back to step #2. This
> creates a vicious cycle where no progress is made.
>
> I believe the underlying issue is the application experiencing high
> backpressure. This can cause the TM to not respond to heartbeats or cause
> long checkpoint durations due to delayed processing of the checkpoint.
>
> I'm confused on the best next steps to take. How do I ensure that
> heartbeats and checkpoints successfully complete when experiencing
> inevitable high packpressure?
>
> Best,
> Hubert
>