You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofer, Tovi " <to...@citi.com> on 2018/01/30 17:17:14 UTC

Sync and Async checkpoint time

Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage - currently S3.
When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn't have such high effect.

We looked at checkpoint history in web, and details from log.

*         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
This happens especially in window operators (tumbling windows) such as below.

*         But from log Sync time seems very short...


Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);



*         Checkpoint info from console:
[cid:image004.png@01D399E6.3A2F69F0]


*         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.

Thanks & regards,
Tovi


Re: Sync and Async checkpoint time

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

there is currently no workaround for this limitation if your operator uses timers, but it is pretty high on our TODO list for release 1.6.

Best,
Stefan

> Am 31.01.2018 um 09:29 schrieb Sofer, Tovi <to...@citi.com>:
> 
> Hi Stefan,
>  
> Thank you for the answer.
> So you mean that any window use in the stream will result in synchronous snapshotting?
> When are you planning to fix this?  
> And is there a workaround?
>  
> Thanks again,
> Tovi
> From: Stefan Richter [mailto:s.richter@data-artisans.com] 
> Sent: יום ג 30 ינואר 2018 21:10
> To: Sofer, Tovi [ICG-IT] <ts...@imceu.eu.ssmb.com>
> Cc: user@flink.apache.org
> Subject: Re: Sync and Async checkpoint time
>  
> Hi,
>  
> this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.
>  
> Best,
> Stefan
> 
> 
> Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <tovi.sofer@citi.com <ma...@citi.com>>:
>  
> Hi group,
>  
> In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.
> When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
> We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.
>  
> We looked at checkpoint history in web, and details from log.
> ·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
> This happens especially in window operators (tumbling windows) such as below.
> ·         But from log Sync time seems very short…
>  
> Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?
>  
> Checkpoint config:
> env.enableCheckpointing(60000);
> env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
>  
>  
> ·         Checkpoint info from console:
> <image004.png>
>  
> ·         Checkpoint info from log:
> 2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
> 2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
> 2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
> 2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
> 2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
> 2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
> 2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
> 2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.
>  
> Thanks & regards,
> 
> Tovi
> 


RE: Sync and Async checkpoint time

Posted by "Sofer, Tovi " <to...@citi.com>.
Hi Stefan,

Thank you for the answer.
So you mean that any window use in the stream will result in synchronous snapshotting?
When are you planning to fix this?
And is there a workaround?

Thanks again,
Tovi
From: Stefan Richter [mailto:s.richter@data-artisans.com]
Sent: יום ג 30 ינואר 2018 21:10
To: Sofer, Tovi [ICG-IT] <ts...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: Sync and Async checkpoint time

Hi,

this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.

Best,
Stefan


Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <to...@citi.com>>:

Hi group,

In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.
When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.

We looked at checkpoint history in web, and details from log.
•         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
This happens especially in window operators (tumbling windows) such as below.
•         But from log Sync time seems very short…

Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?

Checkpoint config:

env.enableCheckpointing(60000);
env.setStateBackend(new FsStateBackend(checkpointDirURI, true));

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);


•         Checkpoint info from console:
<image004.png>

•         Checkpoint info from log:
2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.

Thanks & regards,
Tovi


Re: Sync and Async checkpoint time

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

this looks like the timer service is the culprit for this problem. Timers are currently not stored in the state backend, but in a separate on-heap data structure that does not support copy-on-write or async snapshots in general. Therefore, writing the timers for a snapshot is always synchronous and this explanation would also match your observation that the problem mainly affects window operators, which make heavy use of timers.

Best,
Stefan

> Am 30.01.2018 um 18:17 schrieb Sofer, Tovi <to...@citi.com>:
> 
> Hi group,
>  
> In our project we are using asynchronous  FSStateBackend, and we are trying to move to distributed storage – currently S3.
> When using this storage we are experiencing issues of high backpressure and high latency, in comparison of local storage.
> We are trying to understand the reason, since the checkpoint is asynchronous, so it shouldn’t have such high effect.
>  
> We looked at checkpoint history in web, and details from log.
> ·         From web it seems that Sync checkpoint duration is much higher then Async duration. (again, this is only when using s3, not when using local storage)
> This happens especially in window operators (tumbling windows) such as below.
> ·         But from log Sync time seems very short…
>  
> Do you have any estimation why the async write to FSStateBackend has such high effect on the stream performance?
>  
> Checkpoint config:
> env.enableCheckpointing(60000);
> env.setStateBackend(new FsStateBackend(checkpointDirURI, true));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
>  
>  
> ·         Checkpoint info from console:
> <image004.png>
>  
> ·         Checkpoint info from log:
> 2018-01-30 07:33:36,416 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-42-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-42-thread-1,5,Flink Task Threads] took 12139 ms.
> 2018-01-30 07:33:36,418 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-83] Received acknowledge message for checkpoint 52 from task 19ae368e935f177b513577256505ff37 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:36,676 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-35-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-35-thread-1,5,Flink Task Threads] took 12396 ms.
> 2018-01-30 07:33:36,677 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19bf0464a13f3c9bd559b7559d166de2 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,347 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-17-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-17-thread-1,5,Flink Task Threads] took 13067 ms.
> 2018-01-30 07:33:37,349 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 97e169eefde0c57f52a874dee5a0b5a2 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,418 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-29-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-29-thread-1,5,Flink Task Threads] took 13143 ms.
> 2018-01-30 07:33:37,420 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 19b990204530184a97fbaad373dfaf11 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,508 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-33-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-33-thread-1,5,Flink Task Threads] took 13234 ms.
> 2018-01-30 07:33:37,509 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 5012eda50495ae33ca38e2478b3f8e0d of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:37,589 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [ParsedOrdersDelayWindow (2/4)] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, synchronous part) in thread Thread[ParsedOrdersDelayWindow (2/4),5,Flink Task Threads] took 1 ms.
> 2018-01-30 07:33:37,678 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend - [pool-49-thread-1] DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-49-thread-1,5,Flink Task Threads] took 13403 ms.
> 2018-01-30 07:33:37,680 DEBUG org.apache.flink.runtime.checkpoint.CheckpointCoordinator - [flink-akka.actor.default-dispatcher-79] Received acknowledge message for checkpoint 52 from task 6c26b698209523f6a0c77191b2bcb491 of job 747c4cef2841d2ab090d9ed97e0357cc.
> 2018-01-30 07:33:38,143 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - [pool-25-thread-1] Heap backend snapshot (File Stream Factory @ s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc <s3://artemis/prod_cluster_4/flink_checkpoints/747c4cef2841d2ab090d9ed97e0357cc>, asynchronous part) in thread Thread[pool-25-thread-1,5,Flink Task Threads] took 13863 ms.
>  
> Thanks & regards,
> 
> Tovi
>