You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2017/04/23 18:53:45 UTC

Checkpoints very slow with high backpressure

Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours), see attached
screenshot.


​
In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job
6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Fwd: Checkpoints very slow with high backpressure

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
---------- Forwarded message ----------
From: "Yassine MARZOUGUI" <y....@mindlytix.com>
Date: Apr 23, 2017 20:53
Subject: Checkpoints very slow with high backpressure
To: <us...@flink.apache.org>
Cc:

Hi all,

I have a Streaming pipeline as follows:
1 - read a folder continuousely from HDFS
2 - filter duplicates (using keyby(x->x) and keeping a state per key
indicating whether its is seen)
3 - schedule some future actions on the stream using ProcessFunction and
processing time timers (elements are kept in a MapState)
4- write results back to HDFS using a BucketingSink.

I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).

Currenlty the source contain just one a file of 1GB, so that's the maximum
state that the job might hold. I noticed that the backpressure on the
operators #1 and #2 is High, and the split reader has only read 60 Mb out
of 1Gb source source file. I suspect this is because the ProcessFunction is
slow (on purpose). However looks like this affected the checkpoints which
are failing after the timeout (which is set to 2 hours), see attached
screenshot.


​
In the job manager logs I keep getting warnings :

2017-04-23 19:32:38,827 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Received late message for now expired checkpoint attempt 8 from
210769a077c67841d980776d8caece0a of job
6c7e44d205d738fc8a6cb4da181d2d86.

Is the high backpressure the cause for the checkpoints being too slow? If
yes Is there a way to disbale the backpressure mechanism since the records
will be buffered in the rocksdb state after all which is backed by the disk?

Thank you.

Best,
Yassine

Re: Checkpoints very slow with high backpressure

Posted by rhashmi <ri...@hotmail.com>.
I tried to extend timeout to 1 hour but no luck. it is still timing out. So i
am guessing something stuck, will dig down further. 

Here is configuration detail. 
 
Standalone cluster & checkpoint store in S3. 

i just have 217680 messages in 24 partitions. 

Anyidea?




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13418.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

回复:Checkpoints very slow with high backpressure

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
       Backpressure is indeed delayed the checkpoints because of gradually accumulated inflighting network buffers before barrier alignment.       As Piotr explained, 1.5 can improve to some extent.        After 1.5 we plan to further speed the checkpoint by controlling the channel reader to improve barrier alignment, that has already been verified to decrease the alignment time greatly for backpressure scenarios.
        zhijiang
------------------------------------------------------------------发件人:Piotr Nowojski <pi...@data-artisans.com>发送时间:2018年4月6日(星期五) 00:06收件人:Edward <eg...@hotmail.com>抄 送:user <us...@flink.apache.org>主 题:Re: Checkpoints very slow with high backpressure
Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward <eg...@hotmail.com> wrote:
> 
> Thanks for the update Piotr.
> 
> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).
> 
> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience 
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints very slow with high backpressure

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Thanks for the explanation.

I hope that either 1.5 will solve your issue (please let us know if it doesn’t!) or if you can’t wait, that decreasing memory buffers can mitigate the problem.

Piotrek

> On 5 Apr 2018, at 08:13, Edward <eg...@hotmail.com> wrote:
> 
> Thanks for the update Piotr.
> 
> The reason it prevents us from using checkpoints is this:
> We are relying on the checkpoints to trigger commit of Kafka offsets for our
> source (kafka consumers).
> When there is no backpressure this works fine. When there is backpressure,
> checkpoints fail because they take too long, and our Kafka offsets are never
> committed to Kafka brokers (as we just learned the hard way).
> 
> Normally there is no backpressure in our jobs, but when there is some
> outage, then the jobs do experience 
> backpressure when catching up. And when you're already trying to recover
> from an incident, that is not the ideal time for kafka offsets commits to
> stop working.
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoints very slow with high backpressure

Posted by Edward <eg...@hotmail.com>.
Thanks for the update Piotr.

The reason it prevents us from using checkpoints is this:
We are relying on the checkpoints to trigger commit of Kafka offsets for our
source (kafka consumers).
When there is no backpressure this works fine. When there is backpressure,
checkpoints fail because they take too long, and our Kafka offsets are never
committed to Kafka brokers (as we just learned the hard way).

Normally there is no backpressure in our jobs, but when there is some
outage, then the jobs do experience 
backpressure when catching up. And when you're already trying to recover
from an incident, that is not the ideal time for kafka offsets commits to
stop working.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints very slow with high backpressure

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

If I’m not mistaken this is a known issue, that we were working to resolve for Flink 1.5 release. The problem is that with back pressure, data are being buffered between nodes and on checkpoint, all of those data must be processed before checkpoint can be completed. This is especially problematic if processing a single record takes/can take significant amount of time. 

With Flink 1.5 we introduced mechanism to better control the amount of buffered data and it should address this issue (Flink 1.5 should be released within couple of weeks).

In the mean time, you could try out Flink 1.5 release candidate that has been just published or you could try to reduce the number of configured network buffers, however keep in mind that at some point this can decrease your maximal throughput:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>

On the other hand, why does it prevents you from using a checkpointing at all? 

Piotr Nowojski 

> On 5 Apr 2018, at 06:10, Edward <eg...@hotmail.com> wrote:
> 
> I read through this thread and didn't see any resolution to the slow
> checkpoint issue (just that someone resolved their backpressure issue).
> 
> We are experiencing the same problem: 
> - When there is no backpressure, checkpoints take less than 100ms
> - When there is high backpressure, checkpoints take anywhere from 5 minutes
> to 25 minutes.
> 
> This is preventing us from using the checkpointing feature at all, since
> periodic backpressure is unavoidable.
> 
> We are experiencing this when running on Flink 1.4.0.
> We are retaining only a single checkpoint, and the size of retained
> checkpoint is less than 250KB, so there's not a lot of state.
>   state.backend: jobmanager
>   state.backend.async: true
>   state.backend.fs.checkpointdir: hdfs://checkpoints
>   state.checkpoints.num-retained: 1
>   max concurrent checkpoints: 1
>   checkpointing mode: AT_LEAST_ONCE
> 
> One other data point: if I rewrite the job to allow chaining all steps (i.e.
> same parallelism on all steps, so they fit in 1 task slot), the checkpoints
> are still slow under backpressure, but are an order of magnitude faster --
> they take about 60 seconds rather than 15 minutes.
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Checkpoints very slow with high backpressure

Posted by Edward <eg...@hotmail.com>.
I read through this thread and didn't see any resolution to the slow
checkpoint issue (just that someone resolved their backpressure issue).

We are experiencing the same problem: 
- When there is no backpressure, checkpoints take less than 100ms
- When there is high backpressure, checkpoints take anywhere from 5 minutes
to 25 minutes.

This is preventing us from using the checkpointing feature at all, since
periodic backpressure is unavoidable.

We are experiencing this when running on Flink 1.4.0.
We are retaining only a single checkpoint, and the size of retained
checkpoint is less than 250KB, so there's not a lot of state.
   state.backend: jobmanager
   state.backend.async: true
   state.backend.fs.checkpointdir: hdfs://checkpoints
   state.checkpoints.num-retained: 1
   max concurrent checkpoints: 1
   checkpointing mode: AT_LEAST_ONCE

One other data point: if I rewrite the job to allow chaining all steps (i.e.
same parallelism on all steps, so they fit in 1 task slot), the checkpoints
are still slow under backpressure, but are an order of magnitude faster --
they take about 60 seconds rather than 15 minutes.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoints very slow with high backpressure

Posted by rhashmi <ri...@hotmail.com>.
Nvm i found it. Backpressure caused by aws RDS instance of mysql. 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13468.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Checkpoints very slow with high backpressure

Posted by rhashmi <ri...@hotmail.com>.
Enable info log. it seems it stuck 


==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:45:18,229 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 1 @ 1496321118221

==>
/mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@79e68dd3 for Async calls
on Source: Custom Source (2/12)
2017-06-01 12:45:18,237 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@78da1e82 for Async calls
on Source: Custom Source (5/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@68bff79e for Async calls
on Source: Custom Source (8/12)
2017-06-01 12:45:18,238 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@600bdc29 for Async calls
on Source: Custom Source (11/12)
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:24,853 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:24,854 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 4
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,859 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,860 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,862 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability, zookeeper
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.quorum,
host1:2181,host2:2181
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.storageDir,
s3://somelocation/ha-recovery/
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,863 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 4
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability, zookeeper
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.quorum,
host1:2181,host2:2181
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.storageDir,
s3://somelocation/ha-recovery/
2017-06-01 12:45:24,895 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,896 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,896 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,902 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 
2017-06-01 12:45:24,905 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 
2017-06-01 12:45:24,909 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,909 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 4
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability, zookeeper
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.quorum,
host1:2181,host2:2181
2017-06-01 12:45:24,910 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.storageDir,
s3://somelocation/ha-recovery/
2017-06-01 12:45:24,911 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,911 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,911 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:24,915 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 
2017-06-01 12:45:24,916 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 
2017-06-01 12:45:24,923 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.address, host
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.rpc.port, 6123
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.heap.mb, 512
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 1024
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.numberOfTaskSlots, 20
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.memory.preallocate, false
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: parallelism.default, 4
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: jobmanager.web.port, 8081
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: state.backend, filesystem
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.network.numberOfBuffers, 2048
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.tmp.dirs, /mnt/ephemeral/tmp
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: fs.hdfs.hadoopconf, /opt/hadoop-config
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: yarn.application-attempts, 10
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability, zookeeper
2017-06-01 12:45:24,924 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.quorum,
host1:2181,host2:2181
2017-06-01 12:45:24,925 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.storageDir,
s3://somelocation/ha-recovery/
2017-06-01 12:45:24,925 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: high-availability.zookeeper.path.root, /flink-y
2017-06-01 12:45:24,925 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: zookeeper.sasl.disable, true
2017-06-01 12:45:24,925 INFO 
org.apache.flink.configuration.GlobalConfiguration            - Loading
configuration property: taskmanager.heap.mb, 12288
2017-06-01 12:45:25,187 INFO  org.apache.flink.core.fs.FileSystem                          
- Ensuring all FileSystem streams are closed for Async calls on Source:
Custom Source (11/12)
2017-06-01 12:45:25,188 INFO  org.apache.flink.core.fs.FileSystem                          
- Ensuring all FileSystem streams are closed for Async calls on Source:
Custom Source (2/12)
2017-06-01 12:45:25,196 INFO  org.apache.flink.core.fs.FileSystem                          
- Ensuring all FileSystem streams are closed for Async calls on Source:
Custom Source (5/12)
2017-06-01 12:45:25,197 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:25,203 INFO  org.apache.flink.core.fs.FileSystem                          
- Ensuring all FileSystem streams are closed for Async calls on Source:
Custom Source (8/12)
2017-06-01 12:45:25,227 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 
2017-06-01 12:45:25,257 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumertest :: 
2017-06-01 12:45:25,277 INFO  com.company.deserializer.EventDeserializer                 
- ======> KafkaConsumer :: 

==> /mnt/ephemeral/logs/flink-flink-client-vpc2w2-rep-stage-flink1.log <==
2017-06-01 12:45:45,350 WARN 
org.apache.flink.runtime.client.JobSubmissionClientActor      - Discard
message LeaderSessionMessage(null,ConnectionTimeout) because the expected
leader session ID 2d2a8eac-b837-4605-93cc-81720247f247 did not equal the
received leader session ID null.

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:55:18,229 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 1
expired before completing.
2017-06-01 12:55:18,233 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 2 @ 1496321718230

==>
/mnt/ephemeral/logs/flink-flink-taskmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@44074ae6 for Async calls
on Source: Custom Source (2/12)
2017-06-01 12:55:18,235 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@463dc5a1 for Async calls
on Source: Custom Source (5/12)
2017-06-01 12:55:18,236 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@7871a1bb for Async calls
on Source: Custom Source (8/12)
2017-06-01 12:55:18,237 INFO  org.apache.flink.core.fs.FileSystem                          
- Created new CloseableRegistry
org.apache.flink.core.fs.SafetyNetCloseableRegistry@57df8c1d for Async calls
on Source: Custom Source (11/12)

==> /mnt/ephemeral/logs/flink-flink-jobmanager-0-vpc2w2-rep-stage-flink1.log
<==
2017-06-01 12:58:30,764 WARN 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received
late message for now expired checkpoint attempt 1 from
c601dd04affa7da13a226daa222062e7 of job 303656ace348131ed7a38bb02b4fe374.




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Checkpoints very slow with high backpressure

Posted by rhashmi <ri...@hotmail.com>.
I tried to extend timeout to 1 hour but no luck. it is still timing out & no
exception in log file So i am guessing something stuck, will dig down
further. 

Here is configuration detail. 
  
Standalone cluster & checkpoint store in S3. 

i just have 217680 messages in 24 partitions. 

Anyidea? 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13419.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Checkpoints very slow with high backpressure

Posted by Chen Qin <qi...@gmail.com>.
What is root cause of back pressure?
The reason why I ask is we investigated and applied metrics to measure time
to process event and ends up finding bottle neck at frequent managed state
updates. Our approach was keeping mem cache and periodical updates states
before checkpointing cycle kick in.

This thread might somehow related.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/large-sliding-window-perf-question-td13277.html#none

Chen

On Wed, May 31, 2017 at 7:19 PM, SHI Xiaogang <sh...@gmail.com>
wrote:

> Hi rhashmi
>
> We are also experiencing slow checkpoints when there exist back pressure.
> It seems there is no good method to handle back pressure now.
>
> We work around it by setting a larger number of checkpoint timeout. The
> default value is 10min. But checkpoints usually take more time to complete
> when there exists back pressure.  You can set it via `CheckpointConfig#
> setCheckpointTimeout()`.
>
> Regards,
> Xiaogang
>
>
>
> 2017-06-01 5:36 GMT+08:00 rhashmi <ri...@hotmail.com>:
>
>> So what is the resolution? flink consuming messages from kafka. Flink went
>> down about a day ago, so now flink has to process 24 hour worth of events.
>> But i hit backpressure, as of right now checkpoint are timing out. Is
>> there
>> any recommendation how to handle this situation?
>>
>> Seems like trigger are also not firing so no update being made to down
>> line
>> database.
>>
>> is there recommended approach to handle backpressure?
>>
>> Version Flink 1.2.
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Checkpoints-very-
>> slow-with-high-backpressure-tp12762p13411.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: Checkpoints very slow with high backpressure

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi rhashmi

We are also experiencing slow checkpoints when there exist back pressure.
It seems there is no good method to handle back pressure now.

We work around it by setting a larger number of checkpoint timeout. The
default value is 10min. But checkpoints usually take more time to complete
when there exists back pressure.  You can set it via
`CheckpointConfig#setCheckpointTimeout()`.

Regards,
Xiaogang



2017-06-01 5:36 GMT+08:00 rhashmi <ri...@hotmail.com>:

> So what is the resolution? flink consuming messages from kafka. Flink went
> down about a day ago, so now flink has to process 24 hour worth of events.
> But i hit backpressure, as of right now checkpoint are timing out. Is there
> any recommendation how to handle this situation?
>
> Seems like trigger are also not firing so no update being made to down line
> database.
>
> is there recommended approach to handle backpressure?
>
> Version Flink 1.2.
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Checkpoints-
> very-slow-with-high-backpressure-tp12762p13411.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Checkpoints very slow with high backpressure

Posted by rhashmi <ri...@hotmail.com>.
So what is the resolution? flink consuming messages from kafka. Flink went
down about a day ago, so now flink has to process 24 hour worth of events.
But i hit backpressure, as of right now checkpoint are timing out. Is there
any recommendation how to handle this situation? 

Seems like trigger are also not firing so no update being made to down line
database. 

is there recommended approach to handle backpressure?

Version Flink 1.2. 






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoints-very-slow-with-high-backpressure-tp12762p13411.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Checkpoints very slow with high backpressure

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Ufuk,

The ProcessFunction receives elements and buffers them into a MapState, and
periodically (for example every x seconds) register processing time timers
(according to some rules which it gets from a connected rule stream). When
a timer fires, I pop next element from state, request an external server,
and collect the response.
The requests to the external server should happen periodically and not
continuousely, that's why I control them using timers, and buffer elements
in the RocksdbState.

2017-04-24 13:48 GMT+02:00 Ufuk Celebi <uc...@apache.org>:

> @Yessine: no, there is no way to disable the back pressure mechanism. Do
> you have more details about the two last operators? What do you mean with
> the process function is slow on purpose?
>
> @Rune: with 1.3 Flink will configure the internal buffers in a way that
> not too much data is buffered in the internal buffers (
> https://issues.apache.org/jira/browse/FLINK-4545). You could try the
> current master and check whether it improves the checkpointing behaviour
> under back pressure. Out of curiosity, are you using the async I/O API for
> the communication with the external REST service (https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html)?
>
> – Ufuk
>
>
> On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <rs...@trifork.com>
> wrote:
>
>> Sorry I cant help you, but we're also experiencing slow checkpointing,
>> when having backpressure from sink.
>>
>> I tried HDFS, S3, and RocksDB state backends, but to no avail -
>> checkpointing always times out with backpressure.
>>
>> Can we somehow reduce Flink's internal buffer sizes, so checkpointing
>> with backpressure becomes faster?
>>
>> - Rune
>>
>> ---
>>
>> Our current setup - (improvement suggestions welome!):
>>
>> Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
>>
>> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024
>>
>> Basic program structure:
>>
>> 1) read batch from Kinesis
>>
>> 2) Split batch and shuffle using custom partitioner (consistent hashing).
>>
>> 3) enrich using external REST service
>>
>> 4) Write to database (This step is the bottleneck)
>> On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
>>
>> Im sorry guys if you received multiple instances of this mail, I kept
>> trying to send it yesterday, but looks like the mailing list was stuck and
>> didn't dispatch it until now. Sorry for the disturb.
>> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y....@mindlytix.com>
>> wrote:
>>>
>>> Hi all,
>>> I have a Streaming pipeline as follows:
>>> 1 - read a folder continuousely from HDFS
>>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
>>> indicating whether its is seen)
>>> 3 - schedule some future actions on the stream using ProcessFunction and
>>> processing time timers (elements are kept in a MapState)
>>> 4- write results back to HDFS using a BucketingSink.
>>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>>> Currenlty the source contain just one a file of 1GB, so that's the
>>> maximum state that the job might hold. I noticed that the backpressure on
>>> the operators #1 and #2 is High, and the split reader has only read 60 Mb
>>> out of 1Gb source source file. I suspect this is because the
>>> ProcessFunction is slow (on purpose). However looks like this affected the
>>> checkpoints which are failing after the timeout (which is set to 2 hours),
>>> see attached screenshot.
>>> ​
>>> In the job manager logs I keep getting warnings :
>>>
>>> 2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>>>
>>> Is the high backpressure the cause for the checkpoints being too slow?
>>> If yes Is there a way to disbale the backpressure mechanism since the
>>> records will be buffered in the rocksdb state after all which is backed by
>>> the disk?
>>> Thank you.
>>> Best,
>>> Yassine
>>>
>> --
>>
>> Venlig hilsen/Best regards *Rune Skou Larsen*
>>
>> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark
>> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter:
>> RuneSkouLarsen
>>
>
>

Re: Checkpoints very slow with high backpressure

Posted by Ufuk Celebi <uc...@apache.org>.
@Yessine: no, there is no way to disable the back pressure mechanism. Do
you have more details about the two last operators? What do you mean with
the process function is slow on purpose?

@Rune: with 1.3 Flink will configure the internal buffers in a way that not
too much data is buffered in the internal buffers (
https://issues.apache.org/jira/browse/FLINK-4545). You could try the
current master and check whether it improves the checkpointing behaviour
under back pressure. Out of curiosity, are you using the async I/O API for
the communication with the external REST service (
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html
)?

– Ufuk


On Mon, Apr 24, 2017 at 11:08 AM, Rune Skou Larsen <rs...@trifork.com> wrote:

> Sorry I cant help you, but we're also experiencing slow checkpointing,
> when having backpressure from sink.
>
> I tried HDFS, S3, and RocksDB state backends, but to no avail -
> checkpointing always times out with backpressure.
>
> Can we somehow reduce Flink's internal buffer sizes, so checkpointing with
> backpressure becomes faster?
>
> - Rune
>
> ---
>
> Our current setup - (improvement suggestions welome!):
>
> Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge
>
> program_parallelism: 12taskmanagers: 6slotsPerTaskManager: 4taskmanager_heap_mb: 4096jobmanager_heap_mb: 1024
>
> Basic program structure:
>
> 1) read batch from Kinesis
>
> 2) Split batch and shuffle using custom partitioner (consistent hashing).
>
> 3) enrich using external REST service
>
> 4) Write to database (This step is the bottleneck)
> On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
>
> Im sorry guys if you received multiple instances of this mail, I kept
> trying to send it yesterday, but looks like the mailing list was stuck and
> didn't dispatch it until now. Sorry for the disturb.
> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y....@mindlytix.com>
> wrote:
>>
>> Hi all,
>> I have a Streaming pipeline as follows:
>> 1 - read a folder continuousely from HDFS
>> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
>> indicating whether its is seen)
>> 3 - schedule some future actions on the stream using ProcessFunction and
>> processing time timers (elements are kept in a MapState)
>> 4- write results back to HDFS using a BucketingSink.
>> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>> Currenlty the source contain just one a file of 1GB, so that's the
>> maximum state that the job might hold. I noticed that the backpressure on
>> the operators #1 and #2 is High, and the split reader has only read 60 Mb
>> out of 1Gb source source file. I suspect this is because the
>> ProcessFunction is slow (on purpose). However looks like this affected the
>> checkpoints which are failing after the timeout (which is set to 2 hours),
>> see attached screenshot.
>> ​
>> In the job manager logs I keep getting warnings :
>>
>> 2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>>
>> Is the high backpressure the cause for the checkpoints being too slow? If
>> yes Is there a way to disbale the backpressure mechanism since the records
>> will be buffered in the rocksdb state after all which is backed by the disk?
>> Thank you.
>> Best,
>> Yassine
>>
> --
>
> Venlig hilsen/Best regards *Rune Skou Larsen*
>
> [image: goto] Trifork Public A/S Dyssen 1 · DK-8200 Aarhus N · Denmark
> Phone +45 3160 2497 <+45%2031%2060%2024%2097> Skype: rsltrifork Twitter:
> RuneSkouLarsen
>

Re: Checkpoints very slow with high backpressure

Posted by Rune Skou Larsen <rs...@trifork.com>.
Sorry I cant help you, but we're also experiencing slow checkpointing, 
when having backpressure from sink.

I tried HDFS, S3, and RocksDB state backends, but to no avail - 
checkpointing always times out with backpressure.

Can we somehow reduce Flink's internal buffer sizes, so checkpointing 
with backpressure becomes faster?

- Rune

---

Our current setup - (improvement suggestions welome!):

Flink 1.2.0,  yarn@AWS EMR, 1 master + 3 slaves, m4.xlarge

program_parallelism: 12
taskmanagers: 6
slotsPerTaskManager: 4
taskmanager_heap_mb: 4096
jobmanager_heap_mb: 1024

Basic program structure:

1) read batch from Kinesis

2) Split batch and shuffle using custom partitioner (consistent hashing).

3) enrich using external REST service

4) Write to database (This step is the bottleneck)

On 24-04-2017 09:32, Yassine MARZOUGUI wrote:
> Im sorry guys if you received multiple instances of this mail, I kept 
> trying to send it yesterday, but looks like the mailing list was stuck 
> and didn't dispatch it until now. Sorry for the disturb.
> On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y.marzougui@mindlytix.com 
> <ma...@mindlytix.com>> wrote:
>
>     Hi all,
>     I have a Streaming pipeline as follows:
>     1 - read a folder continuousely from HDFS
>     2 - filter duplicates (using keyby(x->x) and keeping a state per
>     key indicating whether its is seen)
>     3 - schedule some future actions on the stream using
>     ProcessFunction and processing time timers (elements are kept in a
>     MapState)
>     4- write results back to HDFS using a BucketingSink.
>     I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit:
>     9fb074c).
>     Currenlty the source contain just one a file of 1GB, so that's the
>     maximum state that the job might hold. I noticed that the
>     backpressure on the operators #1 and #2 is High, and the split
>     reader has only read 60 Mb out of 1Gb source source file. I
>     suspect this is because the ProcessFunction is slow (on purpose).
>     However looks like this affected the checkpoints which are failing
>     after the timeout (which is set to 2 hours), see attached screenshot.
>     \u200b
>     In the job manager logs I keep getting warnings :
>
>     2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>
>     Is the high backpressure the cause for the checkpoints being too
>     slow? If yes Is there a way to disbale the backpressure mechanism
>     since the records will be buffered in the rocksdb state after all
>     which is backed by the disk?
>     Thank you.
>     Best,
>     Yassine
>
-- 

Venlig hilsen/Best regards *Rune Skou Larsen*

goto Trifork Public A/S Dyssen 1 � DK-8200 Aarhus N � Denmark Phone +45 
3160 2497 Skype: rsltrifork Twitter: RuneSkouLarsen


Re: Checkpoints very slow with high backpressure

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Im sorry guys if you received multiple instances of this mail, I kept
trying to send it yesterday, but looks like the mailing list was stuck and
didn't dispatch it until now. Sorry for the disturb.

On Apr 23, 2017 20:53, "Yassine MARZOUGUI" <y....@mindlytix.com>
wrote:

> Hi all,
>
> I have a Streaming pipeline as follows:
> 1 - read a folder continuousely from HDFS
> 2 - filter duplicates (using keyby(x->x) and keeping a state per key
> indicating whether its is seen)
> 3 - schedule some future actions on the stream using ProcessFunction and
> processing time timers (elements are kept in a MapState)
> 4- write results back to HDFS using a BucketingSink.
>
> I am using RocksdbStateBackend, and Flink 1.3-SNAPSHOT (Commit: 9fb074c).
>
> Currenlty the source contain just one a file of 1GB, so that's the maximum
> state that the job might hold. I noticed that the backpressure on the
> operators #1 and #2 is High, and the split reader has only read 60 Mb out
> of 1Gb source source file. I suspect this is because the ProcessFunction is
> slow (on purpose). However looks like this affected the checkpoints which
> are failing after the timeout (which is set to 2 hours), see attached
> screenshot.
>
>
> ​
> In the job manager logs I keep getting warnings :
>
> 2017-04-23 19:32:38,827 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from 210769a077c67841d980776d8caece0a of job 6c7e44d205d738fc8a6cb4da181d2d86.
>
> Is the high backpressure the cause for the checkpoints being too slow? If
> yes Is there a way to disbale the backpressure mechanism since the records
> will be buffered in the rocksdb state after all which is backed by the disk?
>
> Thank you.
>
> Best,
> Yassine
>
>