You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Slotterback, Chris" <Ch...@comcast.com> on 2020/06/02 00:35:10 UTC

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Congxian,


1. The checkpoints were failing with this exception scattered through the logs:
2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4 could only be replicated to 0 nodes instead of minReplication (=1).

2. Yes, we are using incremental checkpointing
3. Currently our windows are configured to use the process function (we were doing aggregates before), which is my understanding that should make our state update/insert ratio lower, as we are building the liststates of each window over time and processing them on trigger.
4. We set the max concurrent checkpoints back to 1, it was originally configured to that and the checkpoints were taking too long to complete before the next checkpoint interval began.

Our tm’s are normally 3 slots (3_slots.png), we wanted to try running with 1 slot (1_slot.png) and noticed the checkpoint times fell drastically, but with 1 slot per tm our parallelism had to be dropped and our consumer lag was growing.



From: Congxian Qiu <qc...@gmail.com>
Date: Friday, May 29, 2020 at 10:59 PM
To: "Slotterback, Chris" <Ch...@comcast.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Hi
From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint
3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can grow to ~ 15G, my gut feeling is that the state update/insert ratio for your program is very high? so that in one checkpoint you'll generate too much sst files
4. from fig 2 seems you configurate execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could you please try to set it to 1 and have a try?

[1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html*history-tab__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9t4BRpnE$>
[2] https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/ops/config.html*execution-checkpointing-max-concurrent-checkpoints__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9xWJS8V8$>
Best,
Congxian


Slotterback, Chris <Ch...@comcast.com>> 于2020年5月30日周六 上午7:43写道:
Hi there,

We are trying to upgrade a flink app from using FsStateBackend to RocksDBStateBackend to reduce overhead memory requirements. When enabling rocks, we are seeing a drop in used heap memory as it increments to disk, but checkpoint durations have become inconsistent. Our data source has a stable rate of reports coming in parallelly across partitions. The state size doesn’t seem to correlate with the checkpoint duration from what I can see in metrics. we have tried tmpfs and swap on SSDs with high iops, but can’t get a good handle on what’s causing smaller state to take longer to checkpoint. Our checkpoint location is hdfs, and works well in our non-rocks cluster.

Is ~100x checkpoint duration expected when going from fs to rocks state backend, and is checkpoint duration supposed to vary this much with a consistent data source normally?

Chris

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Posted by "Slotterback, Chris" <Ch...@comcast.com>.
Interestingly, it appears to have been related to the stream application design that was causing incremental checkpointing issues. Once the checkpoints started failing, they would cause a positive feedback loop of failure as more and more data built up to write, and other exceptions would pop up indirectly related to the root cause.

Resolution so far:
We noticed that there was always 1 troubled (join) operator that had a single subtask that always took orders-of-magnitude longer than the others. That operator is an intervaljoin that joins 1:many reports by key every window trigger, and noticed that one of our keys had a significantly higher number of reports than the others which we hypothesize is causing that one subtask to crumble under a larger-than-average skew. Since filtering out that particular key, checkpoints have become relatively stable and inline with their size and write time.

From: Congxian Qiu <qc...@gmail.com>
Date: Friday, June 5, 2020 at 10:42 PM
To: Arvid Heise <ar...@ververica.com>
Cc: "Slotterback, Chris" <Ch...@comcast.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Hi Chris

From the given exception, seems there is something wrong of the FileSystem, one reason is that Arvid gave (incremental checkpoint may generate too much small files)
You can turn off incremental checkpoint or try to increase the config `state.backend.fs.memory-threshold` to see if things become better

Best,
Congxian


Arvid Heise <ar...@ververica.com>> 于2020年6月6日周六 上午2:09写道:
Hi Chris,

could you also try what happens when you turn incremental checkpoints off?

Incremental checkpoints may create many small files which are a bad fit for HDFS. You could also evaluate other storage options (net drive, S3) if you find incremental checkpoints to be better.

On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris <Ch...@comcast.com>> wrote:
Congxian,


1. The checkpoints were failing with this exception scattered through the logs:
2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4 could only be replicated to 0 nodes instead of minReplication (=1).

2. Yes, we are using incremental checkpointing
3. Currently our windows are configured to use the process function (we were doing aggregates before), which is my understanding that should make our state update/insert ratio lower, as we are building the liststates of each window over time and processing them on trigger.
4. We set the max concurrent checkpoints back to 1, it was originally configured to that and the checkpoints were taking too long to complete before the next checkpoint interval began.

Our tm’s are normally 3 slots (3_slots.png), we wanted to try running with 1 slot (1_slot.png) and noticed the checkpoint times fell drastically, but with 1 slot per tm our parallelism had to be dropped and our consumer lag was growing.



From: Congxian Qiu <qc...@gmail.com>>
Date: Friday, May 29, 2020 at 10:59 PM
To: "Slotterback, Chris" <Ch...@comcast.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Hi
From the given picture,
1. there were some checkpoint failed(but not because of timeout), could you please check why these checkpoint would fail?
2. The checkpoint data size is the delta size for current checkpoint[1], assume you using incremental checkpoint
3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can grow to ~ 15G, my gut feeling is that the state update/insert ratio for your program is very high? so that in one checkpoint you'll generate too much sst files
4. from fig 2 seems you configurate execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could you please try to set it to 1 and have a try?

[1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html*history-tab__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9t4BRpnE$>
[2] https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/ops/config.html*execution-checkpointing-max-concurrent-checkpoints__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9xWJS8V8$>
Best,
Congxian


Slotterback, Chris <Ch...@comcast.com>> 于2020年5月30日周六 上午7:43写道:
Hi there,

We are trying to upgrade a flink app from using FsStateBackend to RocksDBStateBackend to reduce overhead memory requirements. When enabling rocks, we are seeing a drop in used heap memory as it increments to disk, but checkpoint durations have become inconsistent. Our data source has a stable rate of reports coming in parallelly across partitions. The state size doesn’t seem to correlate with the checkpoint duration from what I can see in metrics. we have tried tmpfs and swap on SSDs with high iops, but can’t get a good handle on what’s causing smaller state to take longer to checkpoint. Our checkpoint location is hdfs, and works well in our non-rocks cluster.

Is ~100x checkpoint duration expected when going from fs to rocks state backend, and is checkpoint duration supposed to vary this much with a consistent data source normally?

Chris


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://urldefense.com/v3/__https:/www.ververica.com/__;!!CQl3mcHX2A!Syy8pJ80gp7Fs4Idy6iK4FLbdoniM1Riox1KVZxjYs6wmVKK3d6ZrThRuaXLzwK-EpmpTDI$>


Follow us @VervericaData

--

Join Flink Forward<https://urldefense.com/v3/__https:/flink-forward.org/__;!!CQl3mcHX2A!Syy8pJ80gp7Fs4Idy6iK4FLbdoniM1Riox1KVZxjYs6wmVKK3d6ZrThRuaXLzwK-e2t7T-A$> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Posted by Congxian Qiu <qc...@gmail.com>.
Hi Chris

From the given exception, seems there is something wrong of the FileSystem,
one reason is that Arvid gave (incremental checkpoint may generate too much
small files)
You can turn off incremental checkpoint or try to increase the config
`state.backend.fs.memory-threshold` to see if things become better

Best,
Congxian


Arvid Heise <ar...@ververica.com> 于2020年6月6日周六 上午2:09写道:

> Hi Chris,
>
> could you also try what happens when you turn incremental checkpoints off?
>
> Incremental checkpoints may create many small files which are a bad fit
> for HDFS. You could also evaluate other storage options (net drive, S3) if
> you find incremental checkpoints to be better.
>
> On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris <
> Chris_Slotterback@comcast.com> wrote:
>
>> Congxian,
>>
>>
>>
>> 1. The checkpoints were failing with this exception scattered through the logs:
>> 2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
>> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4 could only be replicated to 0 nodes instead of minReplication (=1).
>>
>>
>> 2. Yes, we are using incremental checkpointing
>>
>> 3. Currently our windows are configured to use the process function (we
>> were doing aggregates before), which is my understanding that should make
>> our state update/insert ratio lower, as we are building the liststates of
>> each window over time and processing them on trigger.
>>
>> 4. We set the max concurrent checkpoints back to 1, it was originally
>> configured to that and the checkpoints were taking too long to complete
>> before the next checkpoint interval began.
>>
>>
>>
>> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running
>> with 1 slot (1_slot.png) and noticed the checkpoint times fell drastically,
>> but with 1 slot per tm our parallelism had to be dropped and our consumer
>> lag was growing.
>>
>>
>>
>>
>>
>>
>>
>> *From: *Congxian Qiu <qc...@gmail.com>
>> *Date: *Friday, May 29, 2020 at 10:59 PM
>> *To: *"Slotterback, Chris" <Ch...@comcast.com>
>> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
>> *Subject: *[EXTERNAL] Re: Inconsistent checkpoint durations vs state size
>>
>>
>>
>> Hi
>>
>> From the given picture,
>>
>> 1. there were some checkpoint failed(but not because of timeout), could
>> you please check why these checkpoint would fail?
>>
>> 2. The checkpoint data size is the delta size for current checkpoint[1],
>> assume you using incremental checkpoint
>>
>> 3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can
>> grow to ~ 15G, my gut feeling is that the state update/insert ratio for
>> your program is very high? so that in one checkpoint you'll generate too
>> much sst files
>>
>> 4. from fig 2 seems you configurate
>> execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could
>> you please try to set it to 1 and have a try?
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab
>> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html*history-tab__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9t4BRpnE$>
>>
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
>> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/ops/config.html*execution-checkpointing-max-concurrent-checkpoints__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9xWJS8V8$>
>>
>> Best,
>>
>> Congxian
>>
>>
>>
>>
>>
>> Slotterback, Chris <Ch...@comcast.com> 于2020年5月30日周六 上午7:43
>> 写道:
>>
>> Hi there,
>>
>>
>>
>> We are trying to upgrade a flink app from using FsStateBackend to
>> RocksDBStateBackend to reduce overhead memory requirements. When enabling
>> rocks, we are seeing a drop in used heap memory as it increments to disk,
>> but checkpoint durations have become inconsistent. Our data source has a
>> stable rate of reports coming in parallelly across partitions. The state
>> size doesn’t seem to correlate with the checkpoint duration from what I can
>> see in metrics. we have tried tmpfs and swap on SSDs with high iops, but
>> can’t get a good handle on what’s causing smaller state to take longer to
>> checkpoint. Our checkpoint location is hdfs, and works well in our
>> non-rocks cluster.
>>
>>
>>
>> Is ~100x checkpoint duration expected when going from fs to rocks state
>> backend, and is checkpoint duration supposed to vary this much with a
>> consistent data source normally?
>>
>>
>>
>> Chris
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

Posted by Arvid Heise <ar...@ververica.com>.
Hi Chris,

could you also try what happens when you turn incremental checkpoints off?

Incremental checkpoints may create many small files which are a bad fit for
HDFS. You could also evaluate other storage options (net drive, S3) if you
find incremental checkpoints to be better.

On Tue, Jun 2, 2020 at 2:36 AM Slotterback, Chris <
Chris_Slotterback@comcast.com> wrote:

> Congxian,
>
>
>
> 1. The checkpoints were failing with this exception scattered through the logs:
> 2020-06-01 21:04:37,930 WARN  org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/flink-checkpoints/ade55daec06ee72aaf7ceade86c6e7a9/chk-1/2093792d-7ebb-4008-8e20-4daf1849c2d4 could only be replicated to 0 nodes instead of minReplication (=1).
>
>
> 2. Yes, we are using incremental checkpointing
>
> 3. Currently our windows are configured to use the process function (we
> were doing aggregates before), which is my understanding that should make
> our state update/insert ratio lower, as we are building the liststates of
> each window over time and processing them on trigger.
>
> 4. We set the max concurrent checkpoints back to 1, it was originally
> configured to that and the checkpoints were taking too long to complete
> before the next checkpoint interval began.
>
>
>
> Our tm’s are normally 3 slots (3_slots.png), we wanted to try running with
> 1 slot (1_slot.png) and noticed the checkpoint times fell drastically, but
> with 1 slot per tm our parallelism had to be dropped and our consumer lag
> was growing.
>
>
>
>
>
>
>
> *From: *Congxian Qiu <qc...@gmail.com>
> *Date: *Friday, May 29, 2020 at 10:59 PM
> *To: *"Slotterback, Chris" <Ch...@comcast.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *[EXTERNAL] Re: Inconsistent checkpoint durations vs state size
>
>
>
> Hi
>
> From the given picture,
>
> 1. there were some checkpoint failed(but not because of timeout), could
> you please check why these checkpoint would fail?
>
> 2. The checkpoint data size is the delta size for current checkpoint[1],
> assume you using incremental checkpoint
>
> 3. In fig1 the checkpoint size is ~3G, but in fig 2 the delta size can
> grow to ~ 15G, my gut feeling is that the state update/insert ratio for
> your program is very high? so that in one checkpoint you'll generate too
> much sst files
>
> 4. from fig 2 seems you configurate
> execution-checkpointing-max-concurrent-checkpoints[2] bigger than 1, could
> you please try to set it to 1 and have a try?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html#history-tab
> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/monitoring/checkpoint_monitoring.html*history-tab__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9t4BRpnE$>
>
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#execution-checkpointing-max-concurrent-checkpoints
> <https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-master/ops/config.html*execution-checkpointing-max-concurrent-checkpoints__;Iw!!CQl3mcHX2A!V-POF8zuG3zRTpac4NEwhP-2oPtcoufRMd761gk6tJaptpBFtVWI_8D-wEd8Azm9xWJS8V8$>
>
> Best,
>
> Congxian
>
>
>
>
>
> Slotterback, Chris <Ch...@comcast.com> 于2020年5月30日周六 上午7:43写道:
>
> Hi there,
>
>
>
> We are trying to upgrade a flink app from using FsStateBackend to
> RocksDBStateBackend to reduce overhead memory requirements. When enabling
> rocks, we are seeing a drop in used heap memory as it increments to disk,
> but checkpoint durations have become inconsistent. Our data source has a
> stable rate of reports coming in parallelly across partitions. The state
> size doesn’t seem to correlate with the checkpoint duration from what I can
> see in metrics. we have tried tmpfs and swap on SSDs with high iops, but
> can’t get a good handle on what’s causing smaller state to take longer to
> checkpoint. Our checkpoint location is hdfs, and works well in our
> non-rocks cluster.
>
>
>
> Is ~100x checkpoint duration expected when going from fs to rocks state
> backend, and is checkpoint duration supposed to vary this much with a
> consistent data source normally?
>
>
>
> Chris
>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng