You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tony Wei <to...@gmail.com> on 2017/09/26 15:08:44 UTC

Stream Task seems to be blocked after checkpoint timeout

Hi,

Something weird happened on my streaming job.

I found my streaming job seems to be blocked for a long time and I saw the
situation like the picture below. (chk #1245 and #1246 were all finishing
7/8 tasks then marked timeout by JM. Other checkpoints failed with the same
state like #1247 util I restarted TM.)

[image: 內置圖片 1]

I'm not sure what happened, but the consumer stopped fetching records,
buffer usage is 100% and the following task did not seem to fetch data
anymore. Just like the whole TM was stopped.

However, after I restarted TM and force the job restarting from the latest
completed checkpoint, everything worked again. And I don't know how to
reproduce it.

The attachment is my TM log. Because there are many user logs and sensitive
information, I only remain the log from `org.apache.flink...`.

My cluster setting is one JM and one TM with 4 available slots.

Streaming job uses all slots, checkpoint interval is 5 mins and max
concurrent number is 3.

Please let me know if it needs more information to find out what happened
on my streaming job. Thanks for your help.

Best Regards,
Tony Wei

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi,

These are some metrics before I stopped TM. The black parts are sensitive
to our company, so I need to hide them. Sorry about that.
Hope this will help to understand what happened to my streaming job. Thank
you.

This job only has two tasks. The tasks and operators for records out per
second(records in per second) are in front of(behind of) the buffer.
[image: 內置圖片 1]

Best Regards,
Tony Wei

2017-09-26 23:08 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi,
>
> Something weird happened on my streaming job.
>
> I found my streaming job seems to be blocked for a long time and I saw the
> situation like the picture below. (chk #1245 and #1246 were all finishing
> 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same
> state like #1247 util I restarted TM.)
>
> [image: 內置圖片 1]
>
> I'm not sure what happened, but the consumer stopped fetching records,
> buffer usage is 100% and the following task did not seem to fetch data
> anymore. Just like the whole TM was stopped.
>
> However, after I restarted TM and force the job restarting from the latest
> completed checkpoint, everything worked again. And I don't know how to
> reproduce it.
>
> The attachment is my TM log. Because there are many user logs
> and sensitive information, I only remain the log from `org.apache.flink...`.
>
> My cluster setting is one JM and one TM with 4 available slots.
>
> Streaming job uses all slots, checkpoint interval is 5 mins and max
> concurrent number is 3.
>
> Please let me know if it needs more information to find out what happened
> on my streaming job. Thanks for your help.
>
> Best Regards,
> Tony Wei
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Sure, I opened Jira FLINK-7757 and this PR: https://github.com/apache/flink/pull/4764 <https://github.com/apache/flink/pull/4764> .

Best,
Stefan

> Am 03.10.2017 um 10:25 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> Thank you very much. I will try to investigate what's the problem on my cluster and S3.
> BTW, Is there any Jira issue associated with your improvement, so that I can track it?
> 
> Best Regards,
> Tony Wei
> 
> 2017-10-03 16:01 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> from the stack trace, it seems to me like closing the checkpoint output stream to S3 is the culprit:
> 
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000 nid=0x55a2 waiting on condition [0x00007fda092d7000]
>    java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x00000007154050b8> (a java.util.concurrent.FutureTask)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)
> 	at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)
> 	- locked <0x00000007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream)
> 	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> 	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
> 	at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> 	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
> 	- locked <0x0000000715480238> (a org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
> 	- locked <0x000000073ef55b00> (a org.apache.flink.runtime.util.SerializableObject)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
> 	at org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> In particular, this holds lock 0x000000073ef55b00, which blocks the next checkpoint in it’s synchronous phase:
> 
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b waiting for monitor entry [0x00007fda0a5e8000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshotFully(RocksDBKeyedStateBackend.java:379)
> 	- waiting to lock <0x000000073ef55b00> (a org.apache.flink.runtime.util.SerializableObject)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> 	- locked <0x000000073ee55068> (a java.lang.Object)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> 	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> 	at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> This, in turn, blocks the operators main processing loop (I marked it further down the trace) and processing stops.
> 
> So while I assume that something bad happens with your S3, it is also not nice that this brings down the pipeline in such ways.
> 
> I had actually already created a branch that aims to improve (=reduce) the whole locking in the RockDBKeyedStateBackend to prevent exactly such scenarios: 
> Right now, the whole purpose of this lock is protecting the RocksDB instance from getting disposed while concurrent operations, such as checkpoints, are still running. Protecting the RocksDB instance is important because it is a native library and accessing a disposed instance will cause segfaults. 
> However, it is actually not required to hold on to the lock all the time. The idea behind my change is to have a synchronized client counter to track all ongoing workers that use the RocksDB instance, so only incrementing, decrementing, and checking the counter happens under the lock. Disposing the RocksDB instance can then only start when the „client count“ is zero, and after it started, no new clients can register. So it is similar to reader/writer locking, where all ops on the DB are „reader" and disposing the instance is the „writer".
> 
> I am currently on holidays, maybe this small change is quiet useful and I will prioritize it a bit when I am back. Nevertheless, I suggest to investigate why S3 is behaving like this.
> 
> Best,
> Stefan
> 
> 
> 
>> Am 03.10.2017 um 07:26 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> It seems that the similar situation, in which job blocked after checkpoint timeout, came across to my job. BTW, this is another job that I raised parallelism and throughput of input.
>> 
>> After chk #8 started, the whole operator seems blocked.
>> 
>> I recorded some JM / TM logs, snapshots and thread dump logs, which the attachment is. Hope these will help to find the root cause. Thank you.
>> 
>> Best Regards,
>> Tony Wie
>> 
>> ==========================================================================================================================================================
>> 
>> JM log:
>> 
>> 2017-10-03 03:46:49,371 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 7 from b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>> 2017-10-03 03:47:00,977 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>> 
>> TM log:
>> 
>> 2017-10-03 03:46:46,962 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Asynchronous RocksDB snapshot (File Stream Factory @ s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72 <>, asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task Threads] took 1211517 ms.
>> 
>> Snapshots:
>> 
>> <???? 2017-10-03  下午12.20.11.png>
>> <???? 2017-10-03  下午12.22.10.png>
>> 
>> 2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> Hi Stefan,
>> 
>> That reason makes sense to me. Thanks for point me out. 
>> 
>> About my job, the database currently was never used, I disabled it for some reasons, but output to s3 was implemented by async io. 
>> 
>> I used ForkJoinPool with 50 capacity. 
>> I have tried to rebalance after count window to monitor the back pressure on upload operator. 
>> The result is always OK status. 
>> I think the reason is due to that count window buffered lots of records, so the input rate in upload operator was not too high. 
>> 
>> But I am not sure that if the setup for my capacity of ForkJoinPool would impact the process asynchronous checkpoints both machine's resources and s3 connection. 
>> 
>> BTW, s3 serves both operator and checkpointing and I used aws java api to access s3 in upload operator in order to control where the files go. 
>> 
>> Best Regards,
>> Tony Wei
>> 
>> Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>於 2017年9月28日 週四,下午7:43寫道:
>> Hi,
>> 
>> the gap between the sync and the async part does not mean too much. What happens per task is that all operators go through their sync part, and then one thread executes all the async parts, one after the other. So if an async part starts late, this is just because it started only after another async part finished.
>> 
>> I have one more question about your job,because it involves communication with external systems, like S3 and a database. Are you sure that they cannot sometimes become a bottleneck, block, and bring down your job. in particular: is the same S3 used to serve the operator and checkpointing and what is your sustained read/write rate there and the maximum number of connections? You can try to use the backpressure metric and try to identify the first operator (counting from the sink) that indicates backpressure.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> 
>> 
>>> Hi,
>>> 
>>> Sorry. This is the correct one.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> Hi Stefan, 
>>> 
>>> Sorry for providing partial information. The attachment is the full logs for checkpoint #1577.
>>> 
>>> Why I would say it seems that asynchronous part was not executed immediately is due to all synchronous parts were all finished at 2017-09-27 13:49.
>>> Did that mean the checkpoint barrier event had already arrived at the operator and started as soon as when the JM triggered the checkpoint?
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> Hi,
>>> 
>>> I agree that the memory consumption looks good. If there is only one TM, it will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end time? This time measurement starts when the checkpoint is triggered on the job manager, the first contributor is then the time that it takes for the checkpoint barrier event to travel with the stream to the operators. If there is back pressure and a lot of events are buffered, this can introduce delay to this first part, because barriers must not overtake data for correctness. After the barrier arrives at the operator, next comes the synchronous part of the checkpoint, which is typically short running and takes a snapshot of the state (think of creating an immutable version, e.g. through copy on write). In the asynchronous part, this snapshot is persisted to DFS. After that the timing stops and is reported together with the acknowledgement to the job manager. 
>>> 
>>> So, I would assume if reporting took 7 minutes end-to-end, and the async part took 4 minutes, it is likely that it took around 3 minutes for the barrier event to travel with the stream. About the debugging, I think it is hard to figure out what is going on with the DFS if you don’t have metrics on that. Maybe you could attach a sampler to the TM’s jvm and monitor where time is spend for the snapshotting?
>>> 
>>> I am also looping in Stephan, he might have more suggestions.
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> These are some telemetry information, but I don't have history information about gc.
>>>> 
>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>> 
>>>> 1) Yes, my state is not large.
>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use HDFS in the same cluster. However, how can I recognize the problem is this.
>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed above is fine.
>>>> 
>>>> There is only one TM in my cluster for now, so all tasks are running on that machine. I think that means they are in the same JVM, right?
>>>> Besides taking so long on asynchronous part, there is another question is that the late message showed that this task was delay for almost 7 minutes, but the log showed it only took 4 minutes.
>>>> It seems that it was somehow waiting for being executed. Are there some points to find out what happened?
>>>> 
>>>> For the log information, what I means is it is hard to recognize which checkpoint id that asynchronous parts belong to if the checkpoint takes more time and there are more concurrent checkpoints taking place.
>>>> Also, it seems that asynchronous part might be executed right away if there is no resource from thread pool. It is better to measure the time between creation time and processing time, and log it and checkpoint id with the original log that showed what time the asynchronous part took.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> when the async part takes that long I would have 3 things to look at:
>>>> 
>>>> 1) Is your state so large? I don’t think this applies in your case, right?
>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>> 3) Are we running low on memory on that task manager?
>>>> 
>>>> Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.
>>>> 
>>>> Those snapshots should be able to run concurrently, for example so that users can also take savepoints  even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> 
>>>> 
>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>> 
>>>>> Hi Stefan,
>>>>> 
>>>>> The checkpoint on my job has been subsumed again. There are some questions that I don't understand.
>>>>> 
>>>>> Log in JM :
>>>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795
>>>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795
>>>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795
>>>>> 
>>>>> Log in TM:
>>>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.
>>>>> 
>>>>> I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>>>> If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.
>>>>> Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.
>>>>> 
>>>>> Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.
>>>>> 
>>>>> What do you think? Do you have any suggestion for me to deal with these problems? Thank you.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>> Hi Stefan,
>>>>> 
>>>>> Here is the summary for my streaming job's checkpoint after restarting at last night.
>>>>> 
>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>> 
>>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>> 
>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>> 
>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.
>>>>> 
>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>> 
>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 
>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Hi Tony,
>>>>> 
>>>>> are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?
>>>>> 
>>>>> Best,
>>>>> Stefan  
>>>>> 
>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>>> 
>>>>>> Hi Stefan,
>>>>>> 
>>>>>> It seems that I found something strange from JM's log.
>>>>>> 
>>>>>> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
>>>>>> 
>>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
>>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
>>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
>>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
>>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
>>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>> 
>>>>>> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
>>>>>> 
>>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
>>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
>>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
>>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
>>>>>> 
>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
>>>>>> 
>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>>>>> 
>>>>>> Hope these informations are helpful. Thank you.
>>>>>> 
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>> 
>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Hi,
>>>>>> 
>>>>>> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
>>>>>> 
>>>>>> Best,
>>>>>> Stefan
>>>>>> 
>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>>>> 
>>>>>>> Hi Stefan,
>>>>>>> 
>>>>>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>>>>>> My job is roughly like this.
>>>>>>> 
>>>>>>> env.addSource(Kafka)
>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>   .keyBy()
>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>   .addSink(UpdateDatabase)
>>>>>>> 
>>>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>>> 
>>>>>>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>>>>>>> 
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>> 
>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>>>>>>> 
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>> 
>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > Something weird happened on my streaming job.
>>>>>>> >
>>>>>>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>>>>>>> >
>>>>>>> > <snapshot.png>
>>>>>>> >
>>>>>>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>>>>>>> >
>>>>>>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>>>>>>> >
>>>>>>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>> >
>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>> >
>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>>>>>>> >
>>>>>>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>>>>>>> >
>>>>>>> > Best Regards,
>>>>>>> > Tony Wei
>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>>> <chk_ 1577.log>
>> 
>> 
>> <threaddumps.log>
> 
> 


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

Thank you very much. I will try to investigate what's the problem on my
cluster and S3.
BTW, Is there any Jira issue associated with your improvement, so that I
can track it?

Best Regards,
Tony Wei

2017-10-03 16:01 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi,
>
> from the stack trace, it seems to me like closing the checkpoint output
> stream to S3 is the culprit:
>
> "pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000
> nid=0x55a2 waiting on condition [0x00007fda092d7000]
>    java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00000007154050b8> (a java.util.concurrent.
> FutureTask)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> * at
> com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)*
> * at
> org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)*
> - locked <0x00000007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream)
> at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(
> FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(
> FSDataOutputStream.java:106)
> at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.
> close(HadoopDataOutputStream.java:48)
> at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(
> ClosingFSDataOutputStream.java:64)
> at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$
> FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.
> java:319)
> - locked <0x0000000715480238> (a org.apache.flink.runtime.
> state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandl
> e(RocksDBKeyedStateBackend.java:693)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullSnapshotOperation.closeCheckpointStream(
> RocksDBKeyedStateBackend.java:531)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:420)
> - locked <0x000000073ef55b00> (a org.apache.flink.runtime.util.
> SerializableObject)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.
> performOperation(RocksDBKeyedStateBackend.java:399)
> at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:897)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> In particular, this holds lock 0x000000073ef55b00, which blocks the next
> checkpoint in it’s synchronous phase:
>
> "count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer
> (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b
> waiting for monitor entry [0x00007fda0a5e8000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> snapshotFully(RocksDBKeyedStateBackend.java:379)
> *- waiting to lock <0x000000073ef55b00> (a
> org.apache.flink.runtime.util.SerializableObject)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)*
> * at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:654)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:590)
> - locked <0x000000073ee55068> (a java.lang.Object)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpointOnBarrier(StreamTask.java:543)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(
> BarrierBuffer.java:378)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBarrier(BarrierBuffer.java:281)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(
> BarrierBuffer.java:183)
> * at org.apache.flink.streaming.runtime.io
> <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)*
> * at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
> This, in turn, blocks the operators main processing loop (I marked it
> further down the trace) and processing stops.
>
> So while I assume that something bad happens with your S3, it is also not
> nice that this brings down the pipeline in such ways.
>
> I had actually already created a branch that aims to improve (=reduce) the
> whole locking in the RockDBKeyedStateBackend to prevent exactly such
> scenarios:
> Right now, the whole purpose of this lock is protecting the RocksDB
> instance from getting disposed while concurrent operations, such as
> checkpoints, are still running. Protecting the RocksDB instance is
> important because it is a native library and accessing a disposed instance
> will cause segfaults.
> However, it is actually not required to hold on to the lock all the time.
> The idea behind my change is to have a synchronized client counter to track
> all ongoing workers that use the RocksDB instance, so only incrementing,
> decrementing, and checking the counter happens under the lock. Disposing
> the RocksDB instance can then only start when the „client count“ is zero,
> and after it started, no new clients can register. So it is similar to
> reader/writer locking, where all ops on the DB are „reader" and disposing
> the instance is the „writer".
>
> I am currently on holidays, maybe this small change is quiet useful and I
> will prioritize it a bit when I am back. Nevertheless, I suggest to
> investigate why S3 is behaving like this.
>
> Best,
> Stefan
>
>
>
> Am 03.10.2017 um 07:26 schrieb Tony Wei <to...@gmail.com>:
>
> Hi Stefan,
>
> It seems that the similar situation, in which job blocked after checkpoint
> timeout, came across to my job. BTW, this is another job that I raised
> parallelism and throughput of input.
>
> After chk #8 started, the whole operator seems blocked.
>
> I recorded some JM / TM logs, snapshots and thread dump logs, which the
> attachment is. Hope these will help to find the root cause. Thank you.
>
> Best Regards,
> Tony Wie
>
> ============================================================
> ============================================================
> ==================================
>
> JM log:
>
> 2017-10-03 03:46:49,371 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Received late message for now expired checkpoint attempt 7 from
> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
> 2017-10-03 03:47:00,977 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Received late message for now expired checkpoint attempt 8 from
> b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
>
> TM log:
>
> 2017-10-03 03:46:46,962 INFO  org.apache.flink.contrib.streaming.state.
> RocksDBKeyedStateBackend  - Asynchronous RocksDB snapshot (File Stream
> Factory @ s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72,
> asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task
> Threads] took 1211517 ms.
>
> Snapshots:
>
> <???? 2017-10-03  下午12.20.11.png>
> <???? 2017-10-03  下午12.22.10.png>
>
> 2017-09-28 20:29 GMT+08:00 Tony Wei <to...@gmail.com>:
>
>> Hi Stefan,
>>
>> That reason makes sense to me. Thanks for point me out.
>>
>> About my job, the database currently was never used, I disabled it for
>> some reasons, but output to s3 was implemented by async io.
>>
>> I used ForkJoinPool with 50 capacity.
>> I have tried to rebalance after count window to monitor the back pressure
>> on upload operator.
>> The result is always OK status.
>> I think the reason is due to that count window buffered lots of records,
>> so the input rate in upload operator was not too high.
>>
>> But I am not sure that if the setup for my capacity of ForkJoinPool would
>> impact the process asynchronous checkpoints both machine's resources and s3
>> connection.
>>
>> BTW, s3 serves both operator and checkpointing and I used aws java api to
>> access s3 in upload operator in order to control where the files go.
>>
>> Best Regards,
>> Tony Wei
>>
>> Stefan Richter <s....@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道:
>>
>>> Hi,
>>>
>>> the gap between the sync and the async part does not mean too much. What
>>> happens per task is that all operators go through their sync part, and then
>>> one thread executes all the async parts, one after the other. So if an
>>> async part starts late, this is just because it started only after another
>>> async part finished.
>>>
>>> I have one more question about your job,because it involves
>>> communication with external systems, like S3 and a database. Are you sure
>>> that they cannot sometimes become a bottleneck, block, and bring down your
>>> job. in particular: is the same S3 used to serve the operator and
>>> checkpointing and what is your sustained read/write rate there and the
>>> maximum number of connections? You can try to use the backpressure metric
>>> and try to identify the first operator (counting from the sink) that
>>> indicates backpressure.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 28.09.2017 um 12:59 schrieb Tony Wei <to...@gmail.com>:
>>>
>>> Hi,
>>>
>>> Sorry. This is the correct one.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-28 18:55 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>
>>>> Hi Stefan,
>>>>
>>>> Sorry for providing partial information. The attachment is the full
>>>> logs for checkpoint #1577.
>>>>
>>>> Why I would say it seems that asynchronous part was not executed
>>>> immediately is due to all synchronous parts were all finished at 2017-09-27
>>>> 13:49.
>>>> Did that mean the checkpoint barrier event had already arrived at the
>>>> operator and started as soon as when the JM triggered the checkpoint?
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> I agree that the memory consumption looks good. If there is only one
>>>>> TM, it will run inside one JVM. As for the 7 minutes, you mean the reported
>>>>> end-to-end time? This time measurement starts when the checkpoint is
>>>>> triggered on the job manager, the first contributor is then the time that
>>>>> it takes for the checkpoint barrier event to travel with the stream to the
>>>>> operators. If there is back pressure and a lot of events are buffered, this
>>>>> can introduce delay to this first part, because barriers must not overtake
>>>>> data for correctness. After the barrier arrives at the operator, next comes
>>>>> the synchronous part of the checkpoint, which is typically short running
>>>>> and takes a snapshot of the state (think of creating an immutable version,
>>>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>>>> persisted to DFS. After that the timing stops and is reported together with
>>>>> the acknowledgement to the job manager.
>>>>>
>>>>> So, I would assume if reporting took 7 minutes end-to-end, and the
>>>>> async part took 4 minutes, it is likely that it took around 3 minutes for
>>>>> the barrier event to travel with the stream. About the debugging, I think
>>>>> it is hard to figure out what is going on with the DFS if you don’t have
>>>>> metrics on that. Maybe you could attach a sampler to the TM’s jvm and
>>>>> monitor where time is spend for the snapshotting?
>>>>>
>>>>> I am also looping in Stephan, he might have more suggestions.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> These are some telemetry information, but I don't have history
>>>>> information about gc.
>>>>>
>>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>>>
>>>>> 1) Yes, my state is not large.
>>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>>>> same cluster. However, how can I recognize the problem is this.
>>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>>>> above is fine.
>>>>>
>>>>> There is only one TM in my cluster for now, so all tasks are running
>>>>> on that machine. I think that means they are in the same JVM, right?
>>>>> Besides taking so long on asynchronous part, there is another question
>>>>> is that the late message showed that this task was delay for almost 7
>>>>> minutes, but the log showed it only took 4 minutes.
>>>>> It seems that it was somehow waiting for being executed. Are there
>>>>> some points to find out what happened?
>>>>>
>>>>> For the log information, what I means is it is hard to recognize which
>>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>>>>> more time and there are more concurrent checkpoints taking place.
>>>>> Also, it seems that asynchronous part might be executed right away if
>>>>> there is no resource from thread pool. It is better to measure the time
>>>>> between creation time and processing time, and log it and checkpoint id
>>>>> with the original log that showed what time the asynchronous part took.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> when the async part takes that long I would have 3 things to look at:
>>>>>>
>>>>>> 1) Is your state so large? I don’t think this applies in your case,
>>>>>> right?
>>>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>>>> 3) Are we running low on memory on that task manager?
>>>>>>
>>>>>> Do you have telemetry information about used heap and gc pressure on
>>>>>> the problematic task? However, what speaks against the memory problem
>>>>>> hypothesis is that future checkpoints seem to go through again. What I find
>>>>>> very strange is that within the reported 4 minutes of the async part the
>>>>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>>>>> state and write serialized state data to dfs stream, then close the stream.
>>>>>> No locks or waits in that section, so I would assume that for one of the
>>>>>> three reasons I gave, writing the state is terribly slow.
>>>>>>
>>>>>> Those snapshots should be able to run concurrently, for example so
>>>>>> that users can also take savepoints  even when a checkpoint was triggered
>>>>>> and is still running, so there is no way to guarantee that the previous
>>>>>> parts have finished, this is expected behaviour. Which waiting times are
>>>>>> you missing in the log? I think the information about when a checkpoint is
>>>>>> triggered, received by the TM, performing the sync and async part and
>>>>>> acknowledgement time should all be there?.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> The checkpoint on my job has been subsumed again. There are some
>>>>>> questions that I don't understand.
>>>>>>
>>>>>> Log in JM :
>>>>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1577 @ 1506520182795
>>>>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1578 @ 1506520482795
>>>>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Received late message for now expired checkpoint attempt 1577 from
>>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>> - Triggering checkpoint 1579 @ 1506520782795
>>>>>>
>>>>>> Log in TM:
>>>>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>>>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>>>>> Threads] took 240248 ms.
>>>>>>
>>>>>> I think the log in TM might be the late message for #1577 in JM,
>>>>>> because #1576, #1578 had been finished and #1579 hadn't been started at
>>>>>> 13:56:37.
>>>>>> If there is no mistake on my words, I am wondering why the time it
>>>>>> took was 240248 ms (4 min). It seems that it started late than asynchronous
>>>>>> tasks in #1578.
>>>>>> Is there any way to guarantee the previous asynchronous parts of
>>>>>> checkpoints will be executed before the following.
>>>>>>
>>>>>> Moreover, I think it will be better to have more information in INFO
>>>>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>>>>> of checkpoint conveniently.
>>>>>>
>>>>>> What do you think? Do you have any suggestion for me to deal with
>>>>>> these problems? Thank you.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> Here is the summary for my streaming job's checkpoint after
>>>>>>> restarting at last night.
>>>>>>>
>>>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>>>>
>>>>>>> This is the distribution of alignment buffered from the last 12
>>>>>>> hours.
>>>>>>>
>>>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>>>>
>>>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For
>>>>>>> chk #1245 and #1246, you can check the picture I sent before.
>>>>>>>
>>>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>>>>
>>>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes
>>>>>>> goes up to HIGH, and always OK during the night.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>>
>>>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <
>>>>>>> s.richter@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Tony,
>>>>>>>>
>>>>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>>>>> what I see, writing the checkpoint is relatively fast but the time from the
>>>>>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>>>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>>>>>> take a long time to travel to the operators, because a lot of events are
>>>>>>>> piling up in the buffers. Do you also experience large alignments for your
>>>>>>>> checkpoints?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>> It seems that I found something strange from JM's log.
>>>>>>>>
>>>>>>>> It had happened more than once before, but all subtasks would
>>>>>>>> finish their checkpoint attempts in the end.
>>>>>>>>
>>>>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1140 expired before completing.
>>>>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1141 expired before completing.
>>>>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>>>>
>>>>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>>>>>> logs.
>>>>>>>>
>>>>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1245 expired before completing.
>>>>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>>> - Checkpoint 1246 expired before completing.
>>>>>>>>
>>>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw
>>>>>>>> there were two states not discarded successfully. In general, there will be
>>>>>>>> 16 parts for a completed checkpoint state.
>>>>>>>>
>>>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-
>>>>>>>> bf0b-11cc1fc67ab8
>>>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-
>>>>>>>> 8509-5fea4ed25af6
>>>>>>>>
>>>>>>>> Hope these informations are helpful. Thank you.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <
>>>>>>>> s.richter@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> thanks for the information. Unfortunately, I have no immediate
>>>>>>>>> idea what the reason is from the given information. I think most helpful
>>>>>>>>> could be a thread dump, but also metrics on the operator operator level to
>>>>>>>>> figure out which part of the pipeline is the culprit.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>>>>> 1.3.2.
>>>>>>>>> My job is roughly like this.
>>>>>>>>>
>>>>>>>>> env.addSource(Kafka)
>>>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>>>   .keyBy()
>>>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>>>   .addSink(UpdateDatabase)
>>>>>>>>>
>>>>>>>>> It seemed all tasks stopped like the picture I sent in the last
>>>>>>>>> email.
>>>>>>>>>
>>>>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>>>>> happens again.
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>>>>> s.richter@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> that is very strange indeed. I had a look at the logs and there
>>>>>>>>>> is no error or exception reported. I assume there is also no exception in
>>>>>>>>>> your full logs? Which version of flink are you using and what operators
>>>>>>>>>> were running in the task that stopped? If this happens again, would it be
>>>>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Stefan
>>>>>>>>>>
>>>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com
>>>>>>>>>> >:
>>>>>>>>>> >
>>>>>>>>>> > Hi,
>>>>>>>>>> >
>>>>>>>>>> > Something weird happened on my streaming job.
>>>>>>>>>> >
>>>>>>>>>> > I found my streaming job seems to be blocked for a long time
>>>>>>>>>> and I saw the situation like the picture below. (chk #1245 and #1246 were
>>>>>>>>>> all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>>>>> >
>>>>>>>>>> > <snapshot.png>
>>>>>>>>>> >
>>>>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>>>>> >
>>>>>>>>>> > However, after I restarted TM and force the job restarting from
>>>>>>>>>> the latest completed checkpoint, everything worked again. And I don't know
>>>>>>>>>> how to reproduce it.
>>>>>>>>>> >
>>>>>>>>>> > The attachment is my TM log. Because there are many user logs
>>>>>>>>>> and sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>>>>> >
>>>>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>>>>> >
>>>>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and
>>>>>>>>>> max concurrent number is 3.
>>>>>>>>>> >
>>>>>>>>>> > Please let me know if it needs more information to find out
>>>>>>>>>> what happened on my streaming job. Thanks for your help.
>>>>>>>>>> >
>>>>>>>>>> > Best Regards,
>>>>>>>>>> > Tony Wei
>>>>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>> <chk_ 1577.log>
>>>
>>>
>>>
> <threaddumps.log>
>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

from the stack trace, it seems to me like closing the checkpoint output stream to S3 is the culprit:

"pool-55-thread-7" #458829 prio=5 os_prio=0 tid=0x00007fda180c4000 nid=0x55a2 waiting on condition [0x00007fda092d7000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007154050b8> (a java.util.concurrent.FutureTask)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at com.amazonaws.services.s3.transfer.internal.UploadImpl.waitForUploadResult(UploadImpl.java:66)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:131)
	- locked <0x00000007154801d0> (a org.apache.hadoop.fs.s3a.S3AOutputStream)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
	at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
	- locked <0x0000000715480238> (a org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeSnapshotStreamAndGetHandle(RocksDBKeyedStateBackend.java:693)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.closeCheckpointStream(RocksDBKeyedStateBackend.java:531)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:420)
	- locked <0x000000073ef55b00> (a org.apache.flink.runtime.util.SerializableObject)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
	at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

In particular, this holds lock 0x000000073ef55b00, which blocks the next checkpoint in it’s synchronous phase:

"count-with-timeout-window -> s3-uploader -> Sink: meta-store-committer (7/12)" #454093 daemon prio=5 os_prio=0 tid=0x00007fda28040000 nid=0x2f3b waiting for monitor entry [0x00007fda0a5e8000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshotFully(RocksDBKeyedStateBackend.java:379)
	- waiting to lock <0x000000073ef55b00> (a org.apache.flink.runtime.util.SerializableObject)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.snapshot(RocksDBKeyedStateBackend.java:317)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
	- locked <0x000000073ee55068> (a java.lang.Object)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

This, in turn, blocks the operators main processing loop (I marked it further down the trace) and processing stops.

So while I assume that something bad happens with your S3, it is also not nice that this brings down the pipeline in such ways.

I had actually already created a branch that aims to improve (=reduce) the whole locking in the RockDBKeyedStateBackend to prevent exactly such scenarios: 
Right now, the whole purpose of this lock is protecting the RocksDB instance from getting disposed while concurrent operations, such as checkpoints, are still running. Protecting the RocksDB instance is important because it is a native library and accessing a disposed instance will cause segfaults. 
However, it is actually not required to hold on to the lock all the time. The idea behind my change is to have a synchronized client counter to track all ongoing workers that use the RocksDB instance, so only incrementing, decrementing, and checking the counter happens under the lock. Disposing the RocksDB instance can then only start when the „client count“ is zero, and after it started, no new clients can register. So it is similar to reader/writer locking, where all ops on the DB are „reader" and disposing the instance is the „writer".

I am currently on holidays, maybe this small change is quiet useful and I will prioritize it a bit when I am back. Nevertheless, I suggest to investigate why S3 is behaving like this.

Best,
Stefan



> Am 03.10.2017 um 07:26 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> It seems that the similar situation, in which job blocked after checkpoint timeout, came across to my job. BTW, this is another job that I raised parallelism and throughput of input.
> 
> After chk #8 started, the whole operator seems blocked.
> 
> I recorded some JM / TM logs, snapshots and thread dump logs, which the attachment is. Hope these will help to find the root cause. Thank you.
> 
> Best Regards,
> Tony Wie
> 
> ==========================================================================================================================================================
> 
> JM log:
> 
> 2017-10-03 03:46:49,371 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 7 from b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
> 2017-10-03 03:47:00,977 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 8 from b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
> 
> TM log:
> 
> 2017-10-03 03:46:46,962 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Asynchronous RocksDB snapshot (File Stream Factory @ s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72, asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task Threads] took 1211517 ms.
> 
> Snapshots:
> 
> <???? 2017-10-03  下午12.20.11.png>
> <???? 2017-10-03  下午12.22.10.png>
> 
> 2017-09-28 20:29 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> Hi Stefan,
> 
> That reason makes sense to me. Thanks for point me out. 
> 
> About my job, the database currently was never used, I disabled it for some reasons, but output to s3 was implemented by async io. 
> 
> I used ForkJoinPool with 50 capacity. 
> I have tried to rebalance after count window to monitor the back pressure on upload operator. 
> The result is always OK status. 
> I think the reason is due to that count window buffered lots of records, so the input rate in upload operator was not too high. 
> 
> But I am not sure that if the setup for my capacity of ForkJoinPool would impact the process asynchronous checkpoints both machine's resources and s3 connection. 
> 
> BTW, s3 serves both operator and checkpointing and I used aws java api to access s3 in upload operator in order to control where the files go. 
> 
> Best Regards,
> Tony Wei
> 
> Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>於 2017年9月28日 週四,下午7:43寫道:
> Hi,
> 
> the gap between the sync and the async part does not mean too much. What happens per task is that all operators go through their sync part, and then one thread executes all the async parts, one after the other. So if an async part starts late, this is just because it started only after another async part finished.
> 
> I have one more question about your job,because it involves communication with external systems, like S3 and a database. Are you sure that they cannot sometimes become a bottleneck, block, and bring down your job. in particular: is the same S3 used to serve the operator and checkpointing and what is your sustained read/write rate there and the maximum number of connections? You can try to use the backpressure metric and try to identify the first operator (counting from the sink) that indicates backpressure.
> 
> Best,
> Stefan
> 
> 
>> Am 28.09.2017 um 12:59 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
> 
>> Hi,
>> 
>> Sorry. This is the correct one.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> Hi Stefan, 
>> 
>> Sorry for providing partial information. The attachment is the full logs for checkpoint #1577.
>> 
>> Why I would say it seems that asynchronous part was not executed immediately is due to all synchronous parts were all finished at 2017-09-27 13:49.
>> Did that mean the checkpoint barrier event had already arrived at the operator and started as soon as when the JM triggered the checkpoint?
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> Hi,
>> 
>> I agree that the memory consumption looks good. If there is only one TM, it will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end time? This time measurement starts when the checkpoint is triggered on the job manager, the first contributor is then the time that it takes for the checkpoint barrier event to travel with the stream to the operators. If there is back pressure and a lot of events are buffered, this can introduce delay to this first part, because barriers must not overtake data for correctness. After the barrier arrives at the operator, next comes the synchronous part of the checkpoint, which is typically short running and takes a snapshot of the state (think of creating an immutable version, e.g. through copy on write). In the asynchronous part, this snapshot is persisted to DFS. After that the timing stops and is reported together with the acknowledgement to the job manager. 
>> 
>> So, I would assume if reporting took 7 minutes end-to-end, and the async part took 4 minutes, it is likely that it took around 3 minutes for the barrier event to travel with the stream. About the debugging, I think it is hard to figure out what is going on with the DFS if you don’t have metrics on that. Maybe you could attach a sampler to the TM’s jvm and monitor where time is spend for the snapshotting?
>> 
>> I am also looping in Stephan, he might have more suggestions.
>> 
>> Best,
>> Stefan
>> 
>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi Stefan,
>>> 
>>> These are some telemetry information, but I don't have history information about gc.
>>> 
>>> <???? 2017-09-2 8 下午4.51.26.png>
>>> <???? 2017-09-2 8 下午4.51.11.png>
>>> 
>>> 1) Yes, my state is not large.
>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use HDFS in the same cluster. However, how can I recognize the problem is this.
>>> 3) It seems memory usage is bounded. I'm not sure if the status showed above is fine.
>>> 
>>> There is only one TM in my cluster for now, so all tasks are running on that machine. I think that means they are in the same JVM, right?
>>> Besides taking so long on asynchronous part, there is another question is that the late message showed that this task was delay for almost 7 minutes, but the log showed it only took 4 minutes.
>>> It seems that it was somehow waiting for being executed. Are there some points to find out what happened?
>>> 
>>> For the log information, what I means is it is hard to recognize which checkpoint id that asynchronous parts belong to if the checkpoint takes more time and there are more concurrent checkpoints taking place.
>>> Also, it seems that asynchronous part might be executed right away if there is no resource from thread pool. It is better to measure the time between creation time and processing time, and log it and checkpoint id with the original log that showed what time the asynchronous part took.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> Hi,
>>> 
>>> when the async part takes that long I would have 3 things to look at:
>>> 
>>> 1) Is your state so large? I don’t think this applies in your case, right?
>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>> 3) Are we running low on memory on that task manager?
>>> 
>>> Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.
>>> 
>>> Those snapshots should be able to run concurrently, for example so that users can also take savepoints  even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.
>>> 
>>> Best,
>>> Stefan
>>> 
>>> 
>>> 
>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> The checkpoint on my job has been subsumed again. There are some questions that I don't understand.
>>>> 
>>>> Log in JM :
>>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795
>>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795
>>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795
>>>> 
>>>> Log in TM:
>>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.
>>>> 
>>>> I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>>> If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.
>>>> Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.
>>>> 
>>>> Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.
>>>> 
>>>> What do you think? Do you have any suggestion for me to deal with these problems? Thank you.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> Hi Stefan,
>>>> 
>>>> Here is the summary for my streaming job's checkpoint after restarting at last night.
>>>> 
>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>> 
>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>> 
>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>> 
>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.
>>>> 
>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>> 
>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 
>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi Tony,
>>>> 
>>>> are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?
>>>> 
>>>> Best,
>>>> Stefan  
>>>> 
>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>> 
>>>>> Hi Stefan,
>>>>> 
>>>>> It seems that I found something strange from JM's log.
>>>>> 
>>>>> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
>>>>> 
>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>> 
>>>>> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
>>>>> 
>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
>>>>> 
>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
>>>>> 
>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>>>> 
>>>>> Hope these informations are helpful. Thank you.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Hi,
>>>>> 
>>>>> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
>>>>> 
>>>>> Best,
>>>>> Stefan
>>>>> 
>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>>> 
>>>>>> Hi Stefan,
>>>>>> 
>>>>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>>>>> My job is roughly like this.
>>>>>> 
>>>>>> env.addSource(Kafka)
>>>>>>   .map(ParseKeyFromRecord)
>>>>>>   .keyBy()
>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>   .asyncIO(UploadToS3)
>>>>>>   .addSink(UpdateDatabase)
>>>>>> 
>>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>> 
>>>>>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>>>>>> 
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>> 
>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Hi,
>>>>>> 
>>>>>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>>>>>> 
>>>>>> Best,
>>>>>> Stefan
>>>>>> 
>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>>> >
>>>>>> > Hi,
>>>>>> >
>>>>>> > Something weird happened on my streaming job.
>>>>>> >
>>>>>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>>>>>> >
>>>>>> > <snapshot.png>
>>>>>> >
>>>>>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>>>>>> >
>>>>>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>>>>>> >
>>>>>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>> >
>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>> >
>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>>>>>> >
>>>>>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>>>>>> >
>>>>>> > Best Regards,
>>>>>> > Tony Wei
>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
> 
>> <chk_ 1577.log>
> 
> 
> <threaddumps.log>


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

It seems that the similar situation, in which job blocked after checkpoint
timeout, came across to my job. BTW, this is another job that I raised
parallelism and throughput of input.

After chk #8 started, the whole operator seems blocked.

I recorded some JM / TM logs, snapshots and thread dump logs, which the
attachment is. Hope these will help to find the root cause. Thank you.

Best Regards,
Tony Wie

==========================================================================================================================================================

JM log:

2017-10-03 03:46:49,371 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Received late message for now expired checkpoint attempt 7 from
b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.
2017-10-03 03:47:00,977 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
    - Received late message for now expired checkpoint attempt 8 from
b52ef54ad4feb0c6b85a8b8453bff419 of job ecfa5968e831e547ed70d1359a615f72.

TM log:

2017-10-03 03:46:46,962 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
Asynchronous RocksDB snapshot (File Stream Factory @
s3://tony-dev/flink-checkpoints/ecfa5968e831e547ed70d1359a615f72,
asynchronous part) in thread Thread[pool-55-thread-7,5,Flink Task Threads]
took 1211517 ms.

Snapshots:

[image: 內置圖片 1]
[image: 內置圖片 3]

2017-09-28 20:29 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi Stefan,
>
> That reason makes sense to me. Thanks for point me out.
>
> About my job, the database currently was never used, I disabled it for
> some reasons, but output to s3 was implemented by async io.
>
> I used ForkJoinPool with 50 capacity.
> I have tried to rebalance after count window to monitor the back pressure
> on upload operator.
> The result is always OK status.
> I think the reason is due to that count window buffered lots of records,
> so the input rate in upload operator was not too high.
>
> But I am not sure that if the setup for my capacity of ForkJoinPool would
> impact the process asynchronous checkpoints both machine's resources and s3
> connection.
>
> BTW, s3 serves both operator and checkpointing and I used aws java api to
> access s3 in upload operator in order to control where the files go.
>
> Best Regards,
> Tony Wei
>
> Stefan Richter <s....@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道:
>
>> Hi,
>>
>> the gap between the sync and the async part does not mean too much. What
>> happens per task is that all operators go through their sync part, and then
>> one thread executes all the async parts, one after the other. So if an
>> async part starts late, this is just because it started only after another
>> async part finished.
>>
>> I have one more question about your job,because it involves communication
>> with external systems, like S3 and a database. Are you sure that they
>> cannot sometimes become a bottleneck, block, and bring down your job. in
>> particular: is the same S3 used to serve the operator and checkpointing and
>> what is your sustained read/write rate there and the maximum number of
>> connections? You can try to use the backpressure metric and try to identify
>> the first operator (counting from the sink) that indicates backpressure.
>>
>> Best,
>> Stefan
>>
>> Am 28.09.2017 um 12:59 schrieb Tony Wei <to...@gmail.com>:
>>
>> Hi,
>>
>> Sorry. This is the correct one.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 18:55 GMT+08:00 Tony Wei <to...@gmail.com>:
>>
>>> Hi Stefan,
>>>
>>> Sorry for providing partial information. The attachment is the full logs
>>> for checkpoint #1577.
>>>
>>> Why I would say it seems that asynchronous part was not executed
>>> immediately is due to all synchronous parts were all finished at 2017-09-27
>>> 13:49.
>>> Did that mean the checkpoint barrier event had already arrived at the
>>> operator and started as soon as when the JM triggered the checkpoint?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> I agree that the memory consumption looks good. If there is only one
>>>> TM, it will run inside one JVM. As for the 7 minutes, you mean the reported
>>>> end-to-end time? This time measurement starts when the checkpoint is
>>>> triggered on the job manager, the first contributor is then the time that
>>>> it takes for the checkpoint barrier event to travel with the stream to the
>>>> operators. If there is back pressure and a lot of events are buffered, this
>>>> can introduce delay to this first part, because barriers must not overtake
>>>> data for correctness. After the barrier arrives at the operator, next comes
>>>> the synchronous part of the checkpoint, which is typically short running
>>>> and takes a snapshot of the state (think of creating an immutable version,
>>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>>> persisted to DFS. After that the timing stops and is reported together with
>>>> the acknowledgement to the job manager.
>>>>
>>>> So, I would assume if reporting took 7 minutes end-to-end, and the
>>>> async part took 4 minutes, it is likely that it took around 3 minutes for
>>>> the barrier event to travel with the stream. About the debugging, I think
>>>> it is hard to figure out what is going on with the DFS if you don’t have
>>>> metrics on that. Maybe you could attach a sampler to the TM’s jvm and
>>>> monitor where time is spend for the snapshotting?
>>>>
>>>> I am also looping in Stephan, he might have more suggestions.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> These are some telemetry information, but I don't have history
>>>> information about gc.
>>>>
>>>> <???? 2017-09-2 8 下午4.51.26.png>
>>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>>
>>>> 1) Yes, my state is not large.
>>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>>> same cluster. However, how can I recognize the problem is this.
>>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>>> above is fine.
>>>>
>>>> There is only one TM in my cluster for now, so all tasks are running on
>>>> that machine. I think that means they are in the same JVM, right?
>>>> Besides taking so long on asynchronous part, there is another question
>>>> is that the late message showed that this task was delay for almost 7
>>>> minutes, but the log showed it only took 4 minutes.
>>>> It seems that it was somehow waiting for being executed. Are there some
>>>> points to find out what happened?
>>>>
>>>> For the log information, what I means is it is hard to recognize which
>>>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>>>> more time and there are more concurrent checkpoints taking place.
>>>> Also, it seems that asynchronous part might be executed right away if
>>>> there is no resource from thread pool. It is better to measure the time
>>>> between creation time and processing time, and log it and checkpoint id
>>>> with the original log that showed what time the asynchronous part took.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> when the async part takes that long I would have 3 things to look at:
>>>>>
>>>>> 1) Is your state so large? I don’t think this applies in your case,
>>>>> right?
>>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>>> 3) Are we running low on memory on that task manager?
>>>>>
>>>>> Do you have telemetry information about used heap and gc pressure on
>>>>> the problematic task? However, what speaks against the memory problem
>>>>> hypothesis is that future checkpoints seem to go through again. What I find
>>>>> very strange is that within the reported 4 minutes of the async part the
>>>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>>>> state and write serialized state data to dfs stream, then close the stream.
>>>>> No locks or waits in that section, so I would assume that for one of the
>>>>> three reasons I gave, writing the state is terribly slow.
>>>>>
>>>>> Those snapshots should be able to run concurrently, for example so
>>>>> that users can also take savepoints  even when a checkpoint was triggered
>>>>> and is still running, so there is no way to guarantee that the previous
>>>>> parts have finished, this is expected behaviour. Which waiting times are
>>>>> you missing in the log? I think the information about when a checkpoint is
>>>>> triggered, received by the TM, performing the sync and async part and
>>>>> acknowledgement time should all be there?.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>>
>>>>>
>>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> The checkpoint on my job has been subsumed again. There are some
>>>>> questions that I don't understand.
>>>>>
>>>>> Log in JM :
>>>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1577 @ 1506520182795
>>>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1578 @ 1506520482795
>>>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1577 from
>>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1579 @ 1506520782795
>>>>>
>>>>> Log in TM:
>>>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>>>> Threads] took 240248 ms.
>>>>>
>>>>> I think the log in TM might be the late message for #1577 in JM,
>>>>> because #1576, #1578 had been finished and #1579 hadn't been started at
>>>>> 13:56:37.
>>>>> If there is no mistake on my words, I am wondering why the time it
>>>>> took was 240248 ms (4 min). It seems that it started late than asynchronous
>>>>> tasks in #1578.
>>>>> Is there any way to guarantee the previous asynchronous parts of
>>>>> checkpoints will be executed before the following.
>>>>>
>>>>> Moreover, I think it will be better to have more information in INFO
>>>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>>>> of checkpoint conveniently.
>>>>>
>>>>> What do you think? Do you have any suggestion for me to deal with
>>>>> these problems? Thank you.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> Here is the summary for my streaming job's checkpoint after
>>>>>> restarting at last night.
>>>>>>
>>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>>>
>>>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>>>
>>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>>>
>>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For
>>>>>> chk #1245 and #1246, you can check the picture I sent before.
>>>>>>
>>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>>>
>>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes
>>>>>> goes up to HIGH, and always OK during the night.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>>
>>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <
>>>>>> s.richter@data-artisans.com>:
>>>>>>
>>>>>>> Hi Tony,
>>>>>>>
>>>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>>>> what I see, writing the checkpoint is relatively fast but the time from the
>>>>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>>>>> take a long time to travel to the operators, because a lot of events are
>>>>>>> piling up in the buffers. Do you also experience large alignments for your
>>>>>>> checkpoints?
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> It seems that I found something strange from JM's log.
>>>>>>>
>>>>>>> It had happened more than once before, but all subtasks would finish
>>>>>>> their checkpoint attempts in the end.
>>>>>>>
>>>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1140 expired before completing.
>>>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1141 expired before completing.
>>>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>>>
>>>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>>>>> logs.
>>>>>>>
>>>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1245 expired before completing.
>>>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>>>> - Checkpoint 1246 expired before completing.
>>>>>>>
>>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>>>>> were two states not discarded successfully. In general, there will be 16
>>>>>>> parts for a completed checkpoint state.
>>>>>>>
>>>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/
>>>>>>> 7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-
>>>>>>> 45a5-bf0b-11cc1fc67ab8
>>>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/
>>>>>>> 7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-
>>>>>>> 465d-8509-5fea4ed25af6
>>>>>>>
>>>>>>> Hope these informations are helpful. Thank you.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <
>>>>>>> s.richter@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>>>>> what the reason is from the given information. I think most helpful could
>>>>>>>> be a thread dump, but also metrics on the operator operator level to figure
>>>>>>>> out which part of the pipeline is the culprit.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>>>> 1.3.2.
>>>>>>>> My job is roughly like this.
>>>>>>>>
>>>>>>>> env.addSource(Kafka)
>>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>>   .keyBy()
>>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>>   .addSink(UpdateDatabase)
>>>>>>>>
>>>>>>>> It seemed all tasks stopped like the picture I sent in the last
>>>>>>>> email.
>>>>>>>>
>>>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>>>> happens again.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>>>> s.richter@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> that is very strange indeed. I had a look at the logs and there is
>>>>>>>>> no error or exception reported. I assume there is also no exception in your
>>>>>>>>> full logs? Which version of flink are you using and what operators were
>>>>>>>>> running in the task that stopped? If this happens again, would it be
>>>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com
>>>>>>>>> >:
>>>>>>>>> >
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > Something weird happened on my streaming job.
>>>>>>>>> >
>>>>>>>>> > I found my streaming job seems to be blocked for a long time and
>>>>>>>>> I saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>>>> >
>>>>>>>>> > <snapshot.png>
>>>>>>>>> >
>>>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>>>> >
>>>>>>>>> > However, after I restarted TM and force the job restarting from
>>>>>>>>> the latest completed checkpoint, everything worked again. And I don't know
>>>>>>>>> how to reproduce it.
>>>>>>>>> >
>>>>>>>>> > The attachment is my TM log. Because there are many user logs
>>>>>>>>> and sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>>>> >
>>>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>>>> >
>>>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and
>>>>>>>>> max concurrent number is 3.
>>>>>>>>> >
>>>>>>>>> > Please let me know if it needs more information to find out what
>>>>>>>>> happened on my streaming job. Thanks for your help.
>>>>>>>>> >
>>>>>>>>> > Best Regards,
>>>>>>>>> > Tony Wei
>>>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>> <chk_ 1577.log>
>>
>>
>>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

That reason makes sense to me. Thanks for point me out.

About my job, the database currently was never used, I disabled it for some
reasons, but output to s3 was implemented by async io.

I used ForkJoinPool with 50 capacity.
I have tried to rebalance after count window to monitor the back pressure
on upload operator.
The result is always OK status.
I think the reason is due to that count window buffered lots of records, so
the input rate in upload operator was not too high.

But I am not sure that if the setup for my capacity of ForkJoinPool would
impact the process asynchronous checkpoints both machine's resources and s3
connection.

BTW, s3 serves both operator and checkpointing and I used aws java api to
access s3 in upload operator in order to control where the files go.

Best Regards,
Tony Wei

Stefan Richter <s....@data-artisans.com>於 2017年9月28日 週四,下午7:43寫道:

> Hi,
>
> the gap between the sync and the async part does not mean too much. What
> happens per task is that all operators go through their sync part, and then
> one thread executes all the async parts, one after the other. So if an
> async part starts late, this is just because it started only after another
> async part finished.
>
> I have one more question about your job,because it involves communication
> with external systems, like S3 and a database. Are you sure that they
> cannot sometimes become a bottleneck, block, and bring down your job. in
> particular: is the same S3 used to serve the operator and checkpointing and
> what is your sustained read/write rate there and the maximum number of
> connections? You can try to use the backpressure metric and try to identify
> the first operator (counting from the sink) that indicates backpressure.
>
> Best,
> Stefan
>
> Am 28.09.2017 um 12:59 schrieb Tony Wei <to...@gmail.com>:
>
> Hi,
>
> Sorry. This is the correct one.
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 18:55 GMT+08:00 Tony Wei <to...@gmail.com>:
>
>> Hi Stefan,
>>
>> Sorry for providing partial information. The attachment is the full logs
>> for checkpoint #1577.
>>
>> Why I would say it seems that asynchronous part was not executed
>> immediately is due to all synchronous parts were all finished at 2017-09-27
>> 13:49.
>> Did that mean the checkpoint barrier event had already arrived at the
>> operator and started as soon as when the JM triggered the checkpoint?
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>
>>> Hi,
>>>
>>> I agree that the memory consumption looks good. If there is only one TM,
>>> it will run inside one JVM. As for the 7 minutes, you mean the reported
>>> end-to-end time? This time measurement starts when the checkpoint is
>>> triggered on the job manager, the first contributor is then the time that
>>> it takes for the checkpoint barrier event to travel with the stream to the
>>> operators. If there is back pressure and a lot of events are buffered, this
>>> can introduce delay to this first part, because barriers must not overtake
>>> data for correctness. After the barrier arrives at the operator, next comes
>>> the synchronous part of the checkpoint, which is typically short running
>>> and takes a snapshot of the state (think of creating an immutable version,
>>> e.g. through copy on write). In the asynchronous part, this snapshot is
>>> persisted to DFS. After that the timing stops and is reported together with
>>> the acknowledgement to the job manager.
>>>
>>> So, I would assume if reporting took 7 minutes end-to-end, and the async
>>> part took 4 minutes, it is likely that it took around 3 minutes for the
>>> barrier event to travel with the stream. About the debugging, I think it is
>>> hard to figure out what is going on with the DFS if you don’t have metrics
>>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
>>> time is spend for the snapshotting?
>>>
>>> I am also looping in Stephan, he might have more suggestions.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> These are some telemetry information, but I don't have history
>>> information about gc.
>>>
>>> <???? 2017-09-2 8 下午4.51.26.png>
>>> <???? 2017-09-2 8 下午4.51.11.png>
>>>
>>> 1) Yes, my state is not large.
>>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>>> same cluster. However, how can I recognize the problem is this.
>>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>>> above is fine.
>>>
>>> There is only one TM in my cluster for now, so all tasks are running on
>>> that machine. I think that means they are in the same JVM, right?
>>> Besides taking so long on asynchronous part, there is another question
>>> is that the late message showed that this task was delay for almost 7
>>> minutes, but the log showed it only took 4 minutes.
>>> It seems that it was somehow waiting for being executed. Are there some
>>> points to find out what happened?
>>>
>>> For the log information, what I means is it is hard to recognize which
>>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>>> more time and there are more concurrent checkpoints taking place.
>>> Also, it seems that asynchronous part might be executed right away if
>>> there is no resource from thread pool. It is better to measure the time
>>> between creation time and processing time, and log it and checkpoint id
>>> with the original log that showed what time the asynchronous part took.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> when the async part takes that long I would have 3 things to look at:
>>>>
>>>> 1) Is your state so large? I don’t think this applies in your case,
>>>> right?
>>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>>> 3) Are we running low on memory on that task manager?
>>>>
>>>> Do you have telemetry information about used heap and gc pressure on
>>>> the problematic task? However, what speaks against the memory problem
>>>> hypothesis is that future checkpoints seem to go through again. What I find
>>>> very strange is that within the reported 4 minutes of the async part the
>>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>>> state and write serialized state data to dfs stream, then close the stream.
>>>> No locks or waits in that section, so I would assume that for one of the
>>>> three reasons I gave, writing the state is terribly slow.
>>>>
>>>> Those snapshots should be able to run concurrently, for example so that
>>>> users can also take savepoints  even when a checkpoint was triggered and is
>>>> still running, so there is no way to guarantee that the previous parts have
>>>> finished, this is expected behaviour. Which waiting times are you missing
>>>> in the log? I think the information about when a checkpoint is triggered,
>>>> received by the TM, performing the sync and async part and acknowledgement
>>>> time should all be there?.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>
>>>>
>>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> The checkpoint on my job has been subsumed again. There are some
>>>> questions that I don't understand.
>>>>
>>>> Log in JM :
>>>> 2017-09-27 13:45:15,686 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>>> checkpoint 1576 (174693180 bytes in 21597 ms).
>>>> 2017-09-27 13:49:42,795 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>> checkpoint 1577 @ 1506520182795
>>>> 2017-09-27 13:54:42,795 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>> checkpoint 1578 @ 1506520482795
>>>> 2017-09-27 13:55:13,105 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>>> checkpoint 1578 (152621410 bytes in 19109 ms).
>>>> 2017-09-27 13:56:37,103 WARN
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
>>>> message for now expired checkpoint attempt 1577 from
>>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>>> 2017-09-27 13:59:42,795 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>> checkpoint 1579 @ 1506520782795
>>>>
>>>> Log in TM:
>>>> 2017-09-27 13:56:37,105 INFO
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend -
>>>> DefaultOperatorStateBackend snapshot (File Stream Factory @
>>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads]
>>>> took 240248 ms.
>>>>
>>>> I think the log in TM might be the late message for #1577 in JM,
>>>> because #1576, #1578 had been finished and #1579 hadn't been started at
>>>> 13:56:37.
>>>> If there is no mistake on my words, I am wondering why the time it took
>>>> was 240248 ms (4 min). It seems that it started late than asynchronous
>>>> tasks in #1578.
>>>> Is there any way to guarantee the previous asynchronous parts of
>>>> checkpoints will be executed before the following.
>>>>
>>>> Moreover, I think it will be better to have more information in INFO
>>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>>> of checkpoint conveniently.
>>>>
>>>> What do you think? Do you have any suggestion for me to deal with these
>>>> problems? Thank you.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> Here is the summary for my streaming job's checkpoint after restarting
>>>>> at last night.
>>>>>
>>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>>
>>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>>
>>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>>
>>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For
>>>>> chk #1245 and #1246, you can check the picture I sent before.
>>>>>
>>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>>
>>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes
>>>>> up to HIGH, and always OK during the night.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>>
>>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Tony,
>>>>>>
>>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>>> what I see, writing the checkpoint is relatively fast but the time from the
>>>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>>>> take a long time to travel to the operators, because a lot of events are
>>>>>> piling up in the buffers. Do you also experience large alignments for your
>>>>>> checkpoints?
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> It seems that I found something strange from JM's log.
>>>>>>
>>>>>> It had happened more than once before, but all subtasks would finish
>>>>>> their checkpoint attempts in the end.
>>>>>>
>>>>>> 2017-09-26 01:23:28,690 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>>>> checkpoint 1140 @ 1506389008690
>>>>>> 2017-09-26 01:28:28,690 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>>>> checkpoint 1141 @ 1506389308690
>>>>>> 2017-09-26 01:33:28,690 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>>>> checkpoint 1142 @ 1506389608690
>>>>>> 2017-09-26 01:33:28,691 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140
>>>>>> expired before completing.
>>>>>> 2017-09-26 01:38:28,691 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141
>>>>>> expired before completing.
>>>>>> 2017-09-26 01:40:38,044 WARN
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
>>>>>> message for now expired checkpoint attempt 1140 from
>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:40:53,743 WARN
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
>>>>>> message for now expired checkpoint attempt 1141 from
>>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>>> 2017-09-26 01:41:19,332 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>>>>> checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>>
>>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>>>> logs.
>>>>>>
>>>>>> 2017-09-26 10:08:28,690 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>>>> checkpoint 1245 @ 1506420508690
>>>>>> 2017-09-26 10:13:28,690 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>>>>> checkpoint 1246 @ 1506420808690
>>>>>> 2017-09-26 10:18:28,691 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245
>>>>>> expired before completing.
>>>>>> 2017-09-26 10:23:28,691 INFO
>>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246
>>>>>> expired before completing.
>>>>>>
>>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>>>> were two states not discarded successfully. In general, there will be 16
>>>>>> parts for a completed checkpoint state.
>>>>>>
>>>>>> 2017-09-26 18:08:33 36919
>>>>>> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>>>>> 2017-09-26 18:13:34 37419
>>>>>> tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>>>>>
>>>>>> Hope these informations are helpful. Thank you.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <
>>>>>> s.richter@data-artisans.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>>>> what the reason is from the given information. I think most helpful could
>>>>>>> be a thread dump, but also metrics on the operator operator level to figure
>>>>>>> out which part of the pipeline is the culprit.
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>>> 1.3.2.
>>>>>>> My job is roughly like this.
>>>>>>>
>>>>>>> env.addSource(Kafka)
>>>>>>>   .map(ParseKeyFromRecord)
>>>>>>>   .keyBy()
>>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>>   .asyncIO(UploadToS3)
>>>>>>>   .addSink(UpdateDatabase)
>>>>>>>
>>>>>>> It seemed all tasks stopped like the picture I sent in the last
>>>>>>> email.
>>>>>>>
>>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>>> happens again.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>>> s.richter@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> that is very strange indeed. I had a look at the logs and there is
>>>>>>>> no error or exception reported. I assume there is also no exception in your
>>>>>>>> full logs? Which version of flink are you using and what operators were
>>>>>>>> running in the task that stopped? If this happens again, would it be
>>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>>>>>>> >
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > Something weird happened on my streaming job.
>>>>>>>> >
>>>>>>>> > I found my streaming job seems to be blocked for a long time and
>>>>>>>> I saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>>> >
>>>>>>>> > <snapshot.png>
>>>>>>>> >
>>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>>> >
>>>>>>>> > However, after I restarted TM and force the job restarting from
>>>>>>>> the latest completed checkpoint, everything worked again. And I don't know
>>>>>>>> how to reproduce it.
>>>>>>>> >
>>>>>>>> > The attachment is my TM log. Because there are many user logs and
>>>>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>>> >
>>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>>> >
>>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and
>>>>>>>> max concurrent number is 3.
>>>>>>>> >
>>>>>>>> > Please let me know if it needs more information to find out what
>>>>>>>> happened on my streaming job. Thanks for your help.
>>>>>>>> >
>>>>>>>> > Best Regards,
>>>>>>>> > Tony Wei
>>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
> <chk_ 1577.log>
>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

the gap between the sync and the async part does not mean too much. What happens per task is that all operators go through their sync part, and then one thread executes all the async parts, one after the other. So if an async part starts late, this is just because it started only after another async part finished.

I have one more question about your job, because it involves communication with external systems, like S3 and a database. Are you sure that they cannot sometimes become a bottleneck, block, and bring down your job. in particular: is the same S3 used to serve the operator and checkpointing and what is your sustained read/write rate there and the maximum number of connections? You can try to use the backpressure metric and try to identify the first operator (counting from the sink) that indicates backpressure.

Best,
Stefan

> Am 28.09.2017 um 12:59 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi,
> 
> Sorry. This is the correct one.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 18:55 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> Hi Stefan, 
> 
> Sorry for providing partial information. The attachment is the full logs for checkpoint #1577.
> 
> Why I would say it seems that asynchronous part was not executed immediately is due to all synchronous parts were all finished at 2017-09-27 13:49.
> Did that mean the checkpoint barrier event had already arrived at the operator and started as soon as when the JM triggered the checkpoint?
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> I agree that the memory consumption looks good. If there is only one TM, it will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end time? This time measurement starts when the checkpoint is triggered on the job manager, the first contributor is then the time that it takes for the checkpoint barrier event to travel with the stream to the operators. If there is back pressure and a lot of events are buffered, this can introduce delay to this first part, because barriers must not overtake data for correctness. After the barrier arrives at the operator, next comes the synchronous part of the checkpoint, which is typically short running and takes a snapshot of the state (think of creating an immutable version, e.g. through copy on write). In the asynchronous part, this snapshot is persisted to DFS. After that the timing stops and is reported together with the acknowledgement to the job manager. 
> 
> So, I would assume if reporting took 7 minutes end-to-end, and the async part took 4 minutes, it is likely that it took around 3 minutes for the barrier event to travel with the stream. About the debugging, I think it is hard to figure out what is going on with the DFS if you don’t have metrics on that. Maybe you could attach a sampler to the TM’s jvm and monitor where time is spend for the snapshotting?
> 
> I am also looping in Stephan, he might have more suggestions.
> 
> Best,
> Stefan
> 
>> Am 28.09.2017 um 11:25 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> These are some telemetry information, but I don't have history information about gc.
>> 
>> <???? 2017-09-2 8 下午4.51.26.png>
>> <???? 2017-09-2 8 下午4.51.11.png>
>> 
>> 1) Yes, my state is not large.
>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use HDFS in the same cluster. However, how can I recognize the problem is this.
>> 3) It seems memory usage is bounded. I'm not sure if the status showed above is fine.
>> 
>> There is only one TM in my cluster for now, so all tasks are running on that machine. I think that means they are in the same JVM, right?
>> Besides taking so long on asynchronous part, there is another question is that the late message showed that this task was delay for almost 7 minutes, but the log showed it only took 4 minutes.
>> It seems that it was somehow waiting for being executed. Are there some points to find out what happened?
>> 
>> For the log information, what I means is it is hard to recognize which checkpoint id that asynchronous parts belong to if the checkpoint takes more time and there are more concurrent checkpoints taking place.
>> Also, it seems that asynchronous part might be executed right away if there is no resource from thread pool. It is better to measure the time between creation time and processing time, and log it and checkpoint id with the original log that showed what time the asynchronous part took.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> Hi,
>> 
>> when the async part takes that long I would have 3 things to look at:
>> 
>> 1) Is your state so large? I don’t think this applies in your case, right?
>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>> 3) Are we running low on memory on that task manager?
>> 
>> Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.
>> 
>> Those snapshots should be able to run concurrently, for example so that users can also take savepoints  even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi Stefan,
>>> 
>>> The checkpoint on my job has been subsumed again. There are some questions that I don't understand.
>>> 
>>> Log in JM :
>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795
>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795
>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795
>>> 
>>> Log in TM:
>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.
>>> 
>>> I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>> If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.
>>> Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.
>>> 
>>> Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.
>>> 
>>> What do you think? Do you have any suggestion for me to deal with these problems? Thank you.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> Hi Stefan,
>>> 
>>> Here is the summary for my streaming job's checkpoint after restarting at last night.
>>> 
>>> <???? 2017-09-2 7 下午4.56.30.png>
>>> 
>>> This is the distribution of alignment buffered from the last 12 hours.
>>> 
>>> <???? 2017-09-2 7 下午5.05.11.png>
>>> 
>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.
>>> 
>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>> 
>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 
>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> Hi Tony,
>>> 
>>> are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?
>>> 
>>> Best,
>>> Stefan  
>>> 
>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> It seems that I found something strange from JM's log.
>>>> 
>>>> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
>>>> 
>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>> 
>>>> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
>>>> 
>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
>>>> 
>>>> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
>>>> 
>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>>> 
>>>> Hope these informations are helpful. Thank you.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>> 
>>>>> Hi Stefan,
>>>>> 
>>>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>>>> My job is roughly like this.
>>>>> 
>>>>> env.addSource(Kafka)
>>>>>   .map(ParseKeyFromRecord)
>>>>>   .keyBy()
>>>>>   .process(CountAndTimeoutWindow)
>>>>>   .asyncIO(UploadToS3)
>>>>>   .addSink(UpdateDatabase)
>>>>> 
>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>> 
>>>>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>>>>> 
>>>>> Best Regards,
>>>>> Tony Wei
>>>>> 
>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Hi,
>>>>> 
>>>>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>>>>> 
>>>>> Best,
>>>>> Stefan
>>>>> 
>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Something weird happened on my streaming job.
>>>>> >
>>>>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>>>>> >
>>>>> > <snapshot.png>
>>>>> >
>>>>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>>>>> >
>>>>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>>>>> >
>>>>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>>>>> >
>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>> >
>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>>>>> >
>>>>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>>>>> >
>>>>> > Best Regards,
>>>>> > Tony Wei
>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> <chk_ 1577.log>


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi,

Sorry. This is the correct one.

Best Regards,
Tony Wei

2017-09-28 18:55 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi Stefan,
>
> Sorry for providing partial information. The attachment is the full logs
> for checkpoint #1577.
>
> Why I would say it seems that asynchronous part was not executed
> immediately is due to all synchronous parts were all finished at 2017-09-27
> 13:49.
> Did that mean the checkpoint barrier event had already arrived at the
> operator and started as soon as when the JM triggered the checkpoint?
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 18:22 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>
>> Hi,
>>
>> I agree that the memory consumption looks good. If there is only one TM,
>> it will run inside one JVM. As for the 7 minutes, you mean the reported
>> end-to-end time? This time measurement starts when the checkpoint is
>> triggered on the job manager, the first contributor is then the time that
>> it takes for the checkpoint barrier event to travel with the stream to the
>> operators. If there is back pressure and a lot of events are buffered, this
>> can introduce delay to this first part, because barriers must not overtake
>> data for correctness. After the barrier arrives at the operator, next comes
>> the synchronous part of the checkpoint, which is typically short running
>> and takes a snapshot of the state (think of creating an immutable version,
>> e.g. through copy on write). In the asynchronous part, this snapshot is
>> persisted to DFS. After that the timing stops and is reported together with
>> the acknowledgement to the job manager.
>>
>> So, I would assume if reporting took 7 minutes end-to-end, and the async
>> part took 4 minutes, it is likely that it took around 3 minutes for the
>> barrier event to travel with the stream. About the debugging, I think it is
>> hard to figure out what is going on with the DFS if you don’t have metrics
>> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
>> time is spend for the snapshotting?
>>
>> I am also looping in Stephan, he might have more suggestions.
>>
>> Best,
>> Stefan
>>
>> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
>>
>> Hi Stefan,
>>
>> These are some telemetry information, but I don't have history
>> information about gc.
>>
>> <???? 2017-09-2 8 下午4.51.26.png>
>> <???? 2017-09-2 8 下午4.51.11.png>
>>
>> 1) Yes, my state is not large.
>> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
>> Since this is a POC, we might move to AWS in the future or use HDFS in the
>> same cluster. However, how can I recognize the problem is this.
>> 3) It seems memory usage is bounded. I'm not sure if the status showed
>> above is fine.
>>
>> There is only one TM in my cluster for now, so all tasks are running on
>> that machine. I think that means they are in the same JVM, right?
>> Besides taking so long on asynchronous part, there is another question is
>> that the late message showed that this task was delay for almost 7 minutes,
>> but the log showed it only took 4 minutes.
>> It seems that it was somehow waiting for being executed. Are there some
>> points to find out what happened?
>>
>> For the log information, what I means is it is hard to recognize which
>> checkpoint id that asynchronous parts belong to if the checkpoint takes
>> more time and there are more concurrent checkpoints taking place.
>> Also, it seems that asynchronous part might be executed right away if
>> there is no resource from thread pool. It is better to measure the time
>> between creation time and processing time, and log it and checkpoint id
>> with the original log that showed what time the asynchronous part took.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>
>>> Hi,
>>>
>>> when the async part takes that long I would have 3 things to look at:
>>>
>>> 1) Is your state so large? I don’t think this applies in your case,
>>> right?
>>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>>> 3) Are we running low on memory on that task manager?
>>>
>>> Do you have telemetry information about used heap and gc pressure on the
>>> problematic task? However, what speaks against the memory problem
>>> hypothesis is that future checkpoints seem to go through again. What I find
>>> very strange is that within the reported 4 minutes of the async part the
>>> only thing that happens is: open dfs output stream, iterate the in-memory
>>> state and write serialized state data to dfs stream, then close the stream.
>>> No locks or waits in that section, so I would assume that for one of the
>>> three reasons I gave, writing the state is terribly slow.
>>>
>>> Those snapshots should be able to run concurrently, for example so that
>>> users can also take savepoints  even when a checkpoint was triggered and is
>>> still running, so there is no way to guarantee that the previous parts have
>>> finished, this is expected behaviour. Which waiting times are you missing
>>> in the log? I think the information about when a checkpoint is triggered,
>>> received by the TM, performing the sync and async part and acknowledgement
>>> time should all be there?.
>>>
>>> Best,
>>> Stefan
>>>
>>>
>>>
>>> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> The checkpoint on my job has been subsumed again. There are some
>>> questions that I don't understand.
>>>
>>> Log in JM :
>>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1577 @ 1506520182795
>>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1578 @ 1506520482795
>>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Received late message for now expired checkpoint attempt 1577 from
>>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1579 @ 1506520782795
>>>
>>> Log in TM:
>>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>>> Threads] took 240248 ms.
>>>
>>> I think the log in TM might be the late message for #1577 in JM, because
>>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>>> If there is no mistake on my words, I am wondering why the time it took
>>> was 240248 ms (4 min). It seems that it started late than asynchronous
>>> tasks in #1578.
>>> Is there any way to guarantee the previous asynchronous parts of
>>> checkpoints will be executed before the following.
>>>
>>> Moreover, I think it will be better to have more information in INFO
>>> log, such as waiting time and checkpoint id, in order to trace the progress
>>> of checkpoint conveniently.
>>>
>>> What do you think? Do you have any suggestion for me to deal with these
>>> problems? Thank you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>>>
>>>> Hi Stefan,
>>>>
>>>> Here is the summary for my streaming job's checkpoint after restarting
>>>> at last night.
>>>>
>>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>>
>>>> This is the distribution of alignment buffered from the last 12 hours.
>>>>
>>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>>
>>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
>>>> #1245 and #1246, you can check the picture I sent before.
>>>>
>>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>>
>>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes
>>>> up to HIGH, and always OK during the night.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>>
>>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi Tony,
>>>>>
>>>>> are your checkpoints typically close to the timeout boundary? From
>>>>> what I see, writing the checkpoint is relatively fast but the time from the
>>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>>> take a long time to travel to the operators, because a lot of events are
>>>>> piling up in the buffers. Do you also experience large alignments for your
>>>>> checkpoints?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> It seems that I found something strange from JM's log.
>>>>>
>>>>> It had happened more than once before, but all subtasks would finish
>>>>> their checkpoint attempts in the end.
>>>>>
>>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1140 expired before completing.
>>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1141 expired before completing.
>>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>>
>>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>>> logs.
>>>>>
>>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1245 expired before completing.
>>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>>> - Checkpoint 1246 expired before completing.
>>>>>
>>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>>> were two states not discarded successfully. In general, there will be 16
>>>>> parts for a completed checkpoint state.
>>>>>
>>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf
>>>>> 0b-11cc1fc67ab8
>>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85
>>>>> 09-5fea4ed25af6
>>>>>
>>>>> Hope these informations are helpful. Thank you.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>>> what the reason is from the given information. I think most helpful could
>>>>>> be a thread dump, but also metrics on the operator operator level to figure
>>>>>> out which part of the pipeline is the culprit.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> There is no unknown exception in my full log. The Flink version is
>>>>>> 1.3.2.
>>>>>> My job is roughly like this.
>>>>>>
>>>>>> env.addSource(Kafka)
>>>>>>   .map(ParseKeyFromRecord)
>>>>>>   .keyBy()
>>>>>>   .process(CountAndTimeoutWindow)
>>>>>>   .asyncIO(UploadToS3)
>>>>>>   .addSink(UpdateDatabase)
>>>>>>
>>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>>
>>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>>> happens again.
>>>>>>
>>>>>> Best Regards,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <
>>>>>> s.richter@data-artisans.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> that is very strange indeed. I had a look at the logs and there is
>>>>>>> no error or exception reported. I assume there is also no exception in your
>>>>>>> full logs? Which version of flink are you using and what operators were
>>>>>>> running in the task that stopped? If this happens again, would it be
>>>>>>> possible to take a thread dump from that JVM?
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>>>>>> >
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > Something weird happened on my streaming job.
>>>>>>> >
>>>>>>> > I found my streaming job seems to be blocked for a long time and I
>>>>>>> saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>>> >
>>>>>>> > <snapshot.png>
>>>>>>> >
>>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>>> >
>>>>>>> > However, after I restarted TM and force the job restarting from
>>>>>>> the latest completed checkpoint, everything worked again. And I don't know
>>>>>>> how to reproduce it.
>>>>>>> >
>>>>>>> > The attachment is my TM log. Because there are many user logs and
>>>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>>> >
>>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>>> >
>>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and
>>>>>>> max concurrent number is 3.
>>>>>>> >
>>>>>>> > Please let me know if it needs more information to find out what
>>>>>>> happened on my streaming job. Thanks for your help.
>>>>>>> >
>>>>>>> > Best Regards,
>>>>>>> > Tony Wei
>>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

Sorry for providing partial information. The attachment is the full logs
for checkpoint #1577.

Why I would say it seems that asynchronous part was not executed
immediately is due to all synchronous parts were all finished at 2017-09-27
13:49.
Did that mean the checkpoint barrier event had already arrived at the
operator and started as soon as when the JM triggered the checkpoint?

Best Regards,
Tony Wei

2017-09-28 18:22 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi,
>
> I agree that the memory consumption looks good. If there is only one TM,
> it will run inside one JVM. As for the 7 minutes, you mean the reported
> end-to-end time? This time measurement starts when the checkpoint is
> triggered on the job manager, the first contributor is then the time that
> it takes for the checkpoint barrier event to travel with the stream to the
> operators. If there is back pressure and a lot of events are buffered, this
> can introduce delay to this first part, because barriers must not overtake
> data for correctness. After the barrier arrives at the operator, next comes
> the synchronous part of the checkpoint, which is typically short running
> and takes a snapshot of the state (think of creating an immutable version,
> e.g. through copy on write). In the asynchronous part, this snapshot is
> persisted to DFS. After that the timing stops and is reported together with
> the acknowledgement to the job manager.
>
> So, I would assume if reporting took 7 minutes end-to-end, and the async
> part took 4 minutes, it is likely that it took around 3 minutes for the
> barrier event to travel with the stream. About the debugging, I think it is
> hard to figure out what is going on with the DFS if you don’t have metrics
> on that. Maybe you could attach a sampler to the TM’s jvm and monitor where
> time is spend for the snapshotting?
>
> I am also looping in Stephan, he might have more suggestions.
>
> Best,
> Stefan
>
> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
>
> Hi Stefan,
>
> These are some telemetry information, but I don't have history information
> about gc.
>
> <???? 2017-09-2 8 下午4.51.26.png>
> <???? 2017-09-2 8 下午4.51.11.png>
>
> 1) Yes, my state is not large.
> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem.
> Since this is a POC, we might move to AWS in the future or use HDFS in the
> same cluster. However, how can I recognize the problem is this.
> 3) It seems memory usage is bounded. I'm not sure if the status showed
> above is fine.
>
> There is only one TM in my cluster for now, so all tasks are running on
> that machine. I think that means they are in the same JVM, right?
> Besides taking so long on asynchronous part, there is another question is
> that the late message showed that this task was delay for almost 7 minutes,
> but the log showed it only took 4 minutes.
> It seems that it was somehow waiting for being executed. Are there some
> points to find out what happened?
>
> For the log information, what I means is it is hard to recognize which
> checkpoint id that asynchronous parts belong to if the checkpoint takes
> more time and there are more concurrent checkpoints taking place.
> Also, it seems that asynchronous part might be executed right away if
> there is no resource from thread pool. It is better to measure the time
> between creation time and processing time, and log it and checkpoint id
> with the original log that showed what time the asynchronous part took.
>
> Best Regards,
> Tony Wei
>
> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>
>> Hi,
>>
>> when the async part takes that long I would have 3 things to look at:
>>
>> 1) Is your state so large? I don’t think this applies in your case, right?
>> 2) Is something wrong with writing to DFS (network, disks, etc)?
>> 3) Are we running low on memory on that task manager?
>>
>> Do you have telemetry information about used heap and gc pressure on the
>> problematic task? However, what speaks against the memory problem
>> hypothesis is that future checkpoints seem to go through again. What I find
>> very strange is that within the reported 4 minutes of the async part the
>> only thing that happens is: open dfs output stream, iterate the in-memory
>> state and write serialized state data to dfs stream, then close the stream.
>> No locks or waits in that section, so I would assume that for one of the
>> three reasons I gave, writing the state is terribly slow.
>>
>> Those snapshots should be able to run concurrently, for example so that
>> users can also take savepoints  even when a checkpoint was triggered and is
>> still running, so there is no way to guarantee that the previous parts have
>> finished, this is expected behaviour. Which waiting times are you missing
>> in the log? I think the information about when a checkpoint is triggered,
>> received by the TM, performing the sync and async part and acknowledgement
>> time should all be there?.
>>
>> Best,
>> Stefan
>>
>>
>>
>> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>>
>> Hi Stefan,
>>
>> The checkpoint on my job has been subsumed again. There are some
>> questions that I don't understand.
>>
>> Log in JM :
>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1577 @ 1506520182795
>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1578 @ 1506520482795
>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Received late message for now expired checkpoint attempt 1577 from
>> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1579 @ 1506520782795
>>
>> Log in TM:
>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
>> - DefaultOperatorStateBackend snapshot (File Stream Factory @
>> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
>> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
>> Threads] took 240248 ms.
>>
>> I think the log in TM might be the late message for #1577 in JM, because
>> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>> If there is no mistake on my words, I am wondering why the time it took
>> was 240248 ms (4 min). It seems that it started late than asynchronous
>> tasks in #1578.
>> Is there any way to guarantee the previous asynchronous parts of
>> checkpoints will be executed before the following.
>>
>> Moreover, I think it will be better to have more information in INFO log,
>> such as waiting time and checkpoint id, in order to trace the progress of
>> checkpoint conveniently.
>>
>> What do you think? Do you have any suggestion for me to deal with these
>> problems? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>>
>>> Hi Stefan,
>>>
>>> Here is the summary for my streaming job's checkpoint after restarting
>>> at last night.
>>>
>>> <???? 2017-09-2 7 下午4.56.30.png>
>>>
>>> This is the distribution of alignment buffered from the last 12 hours.
>>>
>>> <???? 2017-09-2 7 下午5.05.11.png>
>>>
>>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
>>> #1245 and #1246, you can check the picture I sent before.
>>>
>>>  <???? 2017-09-2 7 下午5.01.24.png>
>>>
>>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes
>>> up to HIGH, and always OK during the night.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>>
>>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>>
>>>> Hi Tony,
>>>>
>>>> are your checkpoints typically close to the timeout boundary? From what
>>>> I see, writing the checkpoint is relatively fast but the time from the
>>>> checkpoint trigger to execution seems very long. This is typically the case
>>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>>> take a long time to travel to the operators, because a lot of events are
>>>> piling up in the buffers. Do you also experience large alignments for your
>>>> checkpoints?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> It seems that I found something strange from JM's log.
>>>>
>>>> It had happened more than once before, but all subtasks would finish
>>>> their checkpoint attempts in the end.
>>>>
>>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1140 @ 1506389008690
>>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1141 @ 1506389308690
>>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1142 @ 1506389608690
>>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1140 expired before completing.
>>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1141 expired before completing.
>>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Received late message for now expired checkpoint attempt 1140 from
>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Received late message for now expired checkpoint attempt 1141 from
>>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>>
>>>> For chk #1245 and #1246, there was no late message from TM. You can
>>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>>> logs.
>>>>
>>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1245 @ 1506420508690
>>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Triggering checkpoint 1246 @ 1506420808690
>>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1245 expired before completing.
>>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>>> - Checkpoint 1246 expired before completing.
>>>>
>>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>>> were two states not discarded successfully. In general, there will be 16
>>>> parts for a completed checkpoint state.
>>>>
>>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf
>>>> 0b-11cc1fc67ab8
>>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85
>>>> 09-5fea4ed25af6
>>>>
>>>> Hope these informations are helpful. Thank you.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>>> what the reason is from the given information. I think most helpful could
>>>>> be a thread dump, but also metrics on the operator operator level to figure
>>>>> out which part of the pipeline is the culprit.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> There is no unknown exception in my full log. The Flink version is
>>>>> 1.3.2.
>>>>> My job is roughly like this.
>>>>>
>>>>> env.addSource(Kafka)
>>>>>   .map(ParseKeyFromRecord)
>>>>>   .keyBy()
>>>>>   .process(CountAndTimeoutWindow)
>>>>>   .asyncIO(UploadToS3)
>>>>>   .addSink(UpdateDatabase)
>>>>>
>>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>>
>>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>>> happens again.
>>>>>
>>>>> Best Regards,
>>>>> Tony Wei
>>>>>
>>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> that is very strange indeed. I had a look at the logs and there is no
>>>>>> error or exception reported. I assume there is also no exception in your
>>>>>> full logs? Which version of flink are you using and what operators were
>>>>>> running in the task that stopped? If this happens again, would it be
>>>>>> possible to take a thread dump from that JVM?
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>>>>> >
>>>>>> > Hi,
>>>>>> >
>>>>>> > Something weird happened on my streaming job.
>>>>>> >
>>>>>> > I found my streaming job seems to be blocked for a long time and I
>>>>>> saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>>> with the same state like #1247 util I restarted TM.)
>>>>>> >
>>>>>> > <snapshot.png>
>>>>>> >
>>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>>> data anymore. Just like the whole TM was stopped.
>>>>>> >
>>>>>> > However, after I restarted TM and force the job restarting from the
>>>>>> latest completed checkpoint, everything worked again. And I don't know how
>>>>>> to reproduce it.
>>>>>> >
>>>>>> > The attachment is my TM log. Because there are many user logs and
>>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>>> >
>>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>>> >
>>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>>>>> concurrent number is 3.
>>>>>> >
>>>>>> > Please let me know if it needs more information to find out what
>>>>>> happened on my streaming job. Thanks for your help.
>>>>>> >
>>>>>> > Best Regards,
>>>>>> > Tony Wei
>>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I agree that the memory consumption looks good. If there is only one TM, it will run inside one JVM. As for the 7 minutes, you mean the reported end-to-end time? This time measurement starts when the checkpoint is triggered on the job manager, the first contributor is then the time that it takes for the checkpoint barrier event to travel with the stream to the operators. If there is back pressure and a lot of events are buffered, this can introduce delay to this first part, because barriers must not overtake data for correctness. After the barrier arrives at the operator, next comes the synchronous part of the checkpoint, which is typically short running and takes a snapshot of the state (think of creating an immutable version, e.g. through copy on write). In the asynchronous part, this snapshot is persisted to DFS. After that the timing stops and is reported together with the acknowledgement to the job manager. 

So, I would assume if reporting took 7 minutes end-to-end, and the async part took 4 minutes, it is likely that it took around 3 minutes for the barrier event to travel with the stream. About the debugging, I think it is hard to figure out what is going on with the DFS if you don’t have metrics on that. Maybe you could attach a sampler to the TM’s jvm and monitor where time is spend for the snapshotting?

I am also looping in Stephan, he might have more suggestions.

Best,
Stefan

> Am 28.09.2017 um 11:25 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> These are some telemetry information, but I don't have history information about gc.
> 
> <???? 2017-09-2 8 下午4.51.26.png>
> <???? 2017-09-2 8 下午4.51.11.png>
> 
> 1) Yes, my state is not large.
> 2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since this is a POC, we might move to AWS in the future or use HDFS in the same cluster. However, how can I recognize the problem is this.
> 3) It seems memory usage is bounded. I'm not sure if the status showed above is fine.
> 
> There is only one TM in my cluster for now, so all tasks are running on that machine. I think that means they are in the same JVM, right?
> Besides taking so long on asynchronous part, there is another question is that the late message showed that this task was delay for almost 7 minutes, but the log showed it only took 4 minutes.
> It seems that it was somehow waiting for being executed. Are there some points to find out what happened?
> 
> For the log information, what I means is it is hard to recognize which checkpoint id that asynchronous parts belong to if the checkpoint takes more time and there are more concurrent checkpoints taking place.
> Also, it seems that asynchronous part might be executed right away if there is no resource from thread pool. It is better to measure the time between creation time and processing time, and log it and checkpoint id with the original log that showed what time the asynchronous part took.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-28 16:25 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> when the async part takes that long I would have 3 things to look at:
> 
> 1) Is your state so large? I don’t think this applies in your case, right?
> 2) Is something wrong with writing to DFS (network, disks, etc)?
> 3) Are we running low on memory on that task manager?
> 
> Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.
> 
> Those snapshots should be able to run concurrently, for example so that users can also take savepoints  even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.
> 
> Best,
> Stefan
> 
> 
> 
>> Am 28.09.2017 um 08:18 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> The checkpoint on my job has been subsumed again. There are some questions that I don't understand.
>> 
>> Log in JM :
>> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
>> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795
>> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795
>> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
>> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
>> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795
>> 
>> Log in TM:
>> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink- <>checkpoints/7c039572b13346f1b17dcc0ace2b72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.
>> 
>> I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
>> If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.
>> Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.
>> 
>> Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.
>> 
>> What do you think? Do you have any suggestion for me to deal with these problems? Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> Hi Stefan,
>> 
>> Here is the summary for my streaming job's checkpoint after restarting at last night.
>> 
>> <???? 2017-09-2 7 下午4.56.30.png>
>> 
>> This is the distribution of alignment buffered from the last 12 hours.
>> 
>> <???? 2017-09-2 7 下午5.05.11.png>
>> 
>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.
>> 
>>  <???? 2017-09-2 7 下午5.01.24.png>
>> 
>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 
>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> Hi Tony,
>> 
>> are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?
>> 
>> Best,
>> Stefan  
>> 
>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi Stefan,
>>> 
>>> It seems that I found something strange from JM's log.
>>> 
>>> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
>>> 
>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>> 
>>> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
>>> 
>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
>>> 
>>> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
>>> 
>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>>> 
>>> Hope these informations are helpful. Thank you.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> Hi,
>>> 
>>> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>>> My job is roughly like this.
>>>> 
>>>> env.addSource(Kafka)
>>>>   .map(ParseKeyFromRecord)
>>>>   .keyBy()
>>>>   .process(CountAndTimeoutWindow)
>>>>   .asyncIO(UploadToS3)
>>>>   .addSink(UpdateDatabase)
>>>> 
>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>> 
>>>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>>>> 
>>>> Best Regards,
>>>> Tony Wei
>>>> 
>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi,
>>>> 
>>>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Something weird happened on my streaming job.
>>>> >
>>>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>>>> >
>>>> > <snapshot.png>
>>>> >
>>>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>>>> >
>>>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>>>> >
>>>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>>>> >
>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>> >
>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>>>> >
>>>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>>>> >
>>>> > Best Regards,
>>>> > Tony Wei
>>>> > <flink-root-taskmanager-0-partial.log>
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
>> 
> 
> 


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

These are some telemetry information, but I don't have history information
about gc.

[image: 內置圖片 2]
[image: 內置圖片 1]

1) Yes, my state is not large.
2) My DFS is S3, but my cluster is out of AWS. It might be a problem. Since
this is a POC, we might move to AWS in the future or use HDFS in the same
cluster. However, how can I recognize the problem is this.
3) It seems memory usage is bounded. I'm not sure if the status showed
above is fine.

There is only one TM in my cluster for now, so all tasks are running on
that machine. I think that means they are in the same JVM, right?
Besides taking so long on asynchronous part, there is another question is
that the late message showed that this task was delay for almost 7 minutes,
but the log showed it only took 4 minutes.
It seems that it was somehow waiting for being executed. Are there some
points to find out what happened?

For the log information, what I means is it is hard to recognize which
checkpoint id that asynchronous parts belong to if the checkpoint takes
more time and there are more concurrent checkpoints taking place.
Also, it seems that asynchronous part might be executed right away if there
is no resource from thread pool. It is better to measure the time between
creation time and processing time, and log it and checkpoint id with the
original log that showed what time the asynchronous part took.

Best Regards,
Tony Wei

2017-09-28 16:25 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi,
>
> when the async part takes that long I would have 3 things to look at:
>
> 1) Is your state so large? I don’t think this applies in your case, right?
> 2) Is something wrong with writing to DFS (network, disks, etc)?
> 3) Are we running low on memory on that task manager?
>
> Do you have telemetry information about used heap and gc pressure on the
> problematic task? However, what speaks against the memory problem
> hypothesis is that future checkpoints seem to go through again. What I find
> very strange is that within the reported 4 minutes of the async part the
> only thing that happens is: open dfs output stream, iterate the in-memory
> state and write serialized state data to dfs stream, then close the stream.
> No locks or waits in that section, so I would assume that for one of the
> three reasons I gave, writing the state is terribly slow.
>
> Those snapshots should be able to run concurrently, for example so that
> users can also take savepoints  even when a checkpoint was triggered and is
> still running, so there is no way to guarantee that the previous parts have
> finished, this is expected behaviour. Which waiting times are you missing
> in the log? I think the information about when a checkpoint is triggered,
> received by the TM, performing the sync and async part and acknowledgement
> time should all be there?.
>
> Best,
> Stefan
>
>
>
> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
>
> Hi Stefan,
>
> The checkpoint on my job has been subsumed again. There are some questions
> that I don't understand.
>
> Log in JM :
> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1577 @ 1506520182795
> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1578 @ 1506520482795
> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1577 from
> 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1579 @ 1506520782795
>
> Log in TM:
> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend
> - DefaultOperatorStateBackend snapshot (File Stream Factory @
> s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
> asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task
> Threads] took 240248 ms.
>
> I think the log in TM might be the late message for #1577 in JM, because
> #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
> If there is no mistake on my words, I am wondering why the time it took
> was 240248 ms (4 min). It seems that it started late than asynchronous
> tasks in #1578.
> Is there any way to guarantee the previous asynchronous parts of
> checkpoints will be executed before the following.
>
> Moreover, I think it will be better to have more information in INFO log,
> such as waiting time and checkpoint id, in order to trace the progress of
> checkpoint conveniently.
>
> What do you think? Do you have any suggestion for me to deal with these
> problems? Thank you.
>
> Best Regards,
> Tony Wei
>
> 2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:
>
>> Hi Stefan,
>>
>> Here is the summary for my streaming job's checkpoint after restarting at
>> last night.
>>
>> <???? 2017-09-2 7 下午4.56.30.png>
>>
>> This is the distribution of alignment buffered from the last 12 hours.
>>
>> <???? 2017-09-2 7 下午5.05.11.png>
>>
>> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
>> #1245 and #1246, you can check the picture I sent before.
>>
>>  <???? 2017-09-2 7 下午5.01.24.png>
>>
>> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up
>> to HIGH, and always OK during the night.
>>
>> Best Regards,
>> Tony Wei
>>
>>
>> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>
>>> Hi Tony,
>>>
>>> are your checkpoints typically close to the timeout boundary? From what
>>> I see, writing the checkpoint is relatively fast but the time from the
>>> checkpoint trigger to execution seems very long. This is typically the case
>>> if your job has a lot of backpressure and therefore the checkpoint barriers
>>> take a long time to travel to the operators, because a lot of events are
>>> piling up in the buffers. Do you also experience large alignments for your
>>> checkpoints?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> It seems that I found something strange from JM's log.
>>>
>>> It had happened more than once before, but all subtasks would finish
>>> their checkpoint attempts in the end.
>>>
>>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1140 @ 1506389008690
>>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1141 @ 1506389308690
>>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1142 @ 1506389608690
>>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Checkpoint 1140 expired before completing.
>>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Checkpoint 1141 expired before completing.
>>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Received late message for now expired checkpoint attempt 1140 from
>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Received late message for now expired checkpoint attempt 1141 from
>>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>>
>>> For chk #1245 and #1246, there was no late message from TM. You can
>>> refer to the TM log. The full completed checkpoint attempt will have 12
>>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>>> logs.
>>>
>>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1245 @ 1506420508690
>>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Triggering checkpoint 1246 @ 1506420808690
>>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Checkpoint 1245 expired before completing.
>>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Checkpoint 1246 expired before completing.
>>>
>>> Moreover, I listed the directory for checkpoints on S3 and saw there
>>> were two states not discarded successfully. In general, there will be 16
>>> parts for a completed checkpoint state.
>>>
>>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf
>>> 0b-11cc1fc67ab8
>>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-85
>>> 09-5fea4ed25af6
>>>
>>> Hope these informations are helpful. Thank you.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> thanks for the information. Unfortunately, I have no immediate idea
>>>> what the reason is from the given information. I think most helpful could
>>>> be a thread dump, but also metrics on the operator operator level to figure
>>>> out which part of the pipeline is the culprit.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>>
>>>> Hi Stefan,
>>>>
>>>> There is no unknown exception in my full log. The Flink version is
>>>> 1.3.2.
>>>> My job is roughly like this.
>>>>
>>>> env.addSource(Kafka)
>>>>   .map(ParseKeyFromRecord)
>>>>   .keyBy()
>>>>   .process(CountAndTimeoutWindow)
>>>>   .asyncIO(UploadToS3)
>>>>   .addSink(UpdateDatabase)
>>>>
>>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>>
>>>> I will keep my eye on taking a thread dump from that JVM if this
>>>> happens again.
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> that is very strange indeed. I had a look at the logs and there is no
>>>>> error or exception reported. I assume there is also no exception in your
>>>>> full logs? Which version of flink are you using and what operators were
>>>>> running in the task that stopped? If this happens again, would it be
>>>>> possible to take a thread dump from that JVM?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Something weird happened on my streaming job.
>>>>> >
>>>>> > I found my streaming job seems to be blocked for a long time and I
>>>>> saw the situation like the picture below. (chk #1245 and #1246 were all
>>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>>> with the same state like #1247 util I restarted TM.)
>>>>> >
>>>>> > <snapshot.png>
>>>>> >
>>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>>> data anymore. Just like the whole TM was stopped.
>>>>> >
>>>>> > However, after I restarted TM and force the job restarting from the
>>>>> latest completed checkpoint, everything worked again. And I don't know how
>>>>> to reproduce it.
>>>>> >
>>>>> > The attachment is my TM log. Because there are many user logs and
>>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>>> >
>>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>>> >
>>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>>>> concurrent number is 3.
>>>>> >
>>>>> > Please let me know if it needs more information to find out what
>>>>> happened on my streaming job. Thanks for your help.
>>>>> >
>>>>> > Best Regards,
>>>>> > Tony Wei
>>>>> > <flink-root-taskmanager-0-partial.log>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

when the async part takes that long I would have 3 things to look at:

1) Is your state so large? I don’t think this applies in your case, right?
2) Is something wrong with writing to DFS (network, disks, etc)?
3) Are we running low on memory on that task manager?

Do you have telemetry information about used heap and gc pressure on the problematic task? However, what speaks against the memory problem hypothesis is that future checkpoints seem to go through again. What I find very strange is that within the reported 4 minutes of the async part the only thing that happens is: open dfs output stream, iterate the in-memory state and write serialized state data to dfs stream, then close the stream. No locks or waits in that section, so I would assume that for one of the three reasons I gave, writing the state is terribly slow.

Those snapshots should be able to run concurrently, for example so that users can also take savepoints  even when a checkpoint was triggered and is still running, so there is no way to guarantee that the previous parts have finished, this is expected behaviour. Which waiting times are you missing in the log? I think the information about when a checkpoint is triggered, received by the TM, performing the sync and async part and acknowledgement time should all be there?.

Best,
Stefan



> Am 28.09.2017 um 08:18 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> The checkpoint on my job has been subsumed again. There are some questions that I don't understand.
> 
> Log in JM :
> 2017-09-27 13:45:15,686 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1576 (174693180 bytes in 21597 ms).
> 2017-09-27 13:49:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1577 @ 1506520182795
> 2017-09-27 13:54:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1578 @ 1506520482795
> 2017-09-27 13:55:13,105 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1578 (152621410 bytes in 19109 ms).
> 2017-09-27 13:56:37,103 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1577 from 2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
> 2017-09-27 13:59:42,795 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1579 @ 1506520782795
> 
> Log in TM:
> 2017-09-27 13:56:37,105 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2, asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads] took 240248 ms.
> 
> I think the log in TM might be the late message for #1577 in JM, because #1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
> If there is no mistake on my words, I am wondering why the time it took was 240248 ms (4 min). It seems that it started late than asynchronous tasks in #1578.
> Is there any way to guarantee the previous asynchronous parts of checkpoints will be executed before the following.
> 
> Moreover, I think it will be better to have more information in INFO log, such as waiting time and checkpoint id, in order to trace the progress of checkpoint conveniently.
> 
> What do you think? Do you have any suggestion for me to deal with these problems? Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-27 17:11 GMT+08:00 Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> Hi Stefan,
> 
> Here is the summary for my streaming job's checkpoint after restarting at last night.
> 
> <???? 2017-09-2 7 下午4.56.30.png>
> 
> This is the distribution of alignment buffered from the last 12 hours.
> 
> <???? 2017-09-2 7 下午5.05.11.png>
> 
> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk #1245 and #1246, you can check the picture I sent before.
> 
>  <???? 2017-09-2 7 下午5.01.24.png>
> 
> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up to HIGH, and always OK during the night.
> 
> Best Regards,
> Tony Wei
> 
> 
> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi Tony,
> 
> are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?
> 
> Best,
> Stefan  
> 
>> Am 27.09.2017 um 10:43 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> It seems that I found something strange from JM's log.
>> 
>> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
>> 
>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>> 
>> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
>> 
>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
>> 
>> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
>> 
>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
>> 
>> Hope these informations are helpful. Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> Hi,
>> 
>> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
>> 
>> Best,
>> Stefan
>> 
>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> 
>>> Hi Stefan,
>>> 
>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>> My job is roughly like this.
>>> 
>>> env.addSource(Kafka)
>>>   .map(ParseKeyFromRecord)
>>>   .keyBy()
>>>   .process(CountAndTimeoutWindow)
>>>   .asyncIO(UploadToS3)
>>>   .addSink(UpdateDatabase)
>>> 
>>> It seemed all tasks stopped like the picture I sent in the last email.
>>> 
>>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>>> 
>>> Best Regards,
>>> Tony Wei
>>> 
>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>>> Hi,
>>> 
>>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>>> 
>>> Best,
>>> Stefan
>>> 
>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>>> >
>>> > Hi,
>>> >
>>> > Something weird happened on my streaming job.
>>> >
>>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>>> >
>>> > <snapshot.png>
>>> >
>>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>>> >
>>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>>> >
>>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>>> >
>>> > My cluster setting is one JM and one TM with 4 available slots.
>>> >
>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>>> >
>>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>>> >
>>> > Best Regards,
>>> > Tony Wei
>>> > <flink-root-taskmanager-0-partial.log>
>>> 
>>> 
>> 
>> 
> 
> 
> 


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

The checkpoint on my job has been subsumed again. There are some questions
that I don't understand.

Log in JM :
2017-09-27 13:45:15,686 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1576 (174693180 bytes in 21597 ms).
2017-09-27 13:49:42,795 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1577 @ 1506520182795
2017-09-27 13:54:42,795 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1578 @ 1506520482795
2017-09-27 13:55:13,105 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 1578 (152621410 bytes in 19109 ms).
2017-09-27 13:56:37,103 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Received late message for now expired checkpoint attempt 1577 from
2273da50f29b9dee731f7bd749e91c80 of job 7c039572b....
2017-09-27 13:59:42,795 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 1579 @ 1506520782795

Log in TM:
2017-09-27 13:56:37,105 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend
- DefaultOperatorStateBackend snapshot (File Stream Factory @
s3://tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2,
asynchronous part) in thread Thread[pool-7-thread-322,5,Flink Task Threads]
took 240248 ms.

I think the log in TM might be the late message for #1577 in JM, because
#1576, #1578 had been finished and #1579 hadn't been started at 13:56:37.
If there is no mistake on my words, I am wondering why the time it took was
240248 ms (4 min). It seems that it started late than asynchronous tasks in
#1578.
Is there any way to guarantee the previous asynchronous parts of
checkpoints will be executed before the following.

Moreover, I think it will be better to have more information in INFO log,
such as waiting time and checkpoint id, in order to trace the progress of
checkpoint conveniently.

What do you think? Do you have any suggestion for me to deal with these
problems? Thank you.

Best Regards,
Tony Wei

2017-09-27 17:11 GMT+08:00 Tony Wei <to...@gmail.com>:

> Hi Stefan,
>
> Here is the summary for my streaming job's checkpoint after restarting at
> last night.
>
> [image: 內置圖片 1]
>
> This is the distribution of alignment buffered from the last 12 hours.
>
> [image: 內置圖片 3]
>
> And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
> #1245 and #1246, you can check the picture I sent before.
>
>  [image: 內置圖片 2]
>
> AFAIK, the back pressure rate usually is in LOW status, sometimes goes up
> to HIGH, and always OK during the night.
>
> Best Regards,
> Tony Wei
>
>
> 2017-09-27 16:54 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>
>> Hi Tony,
>>
>> are your checkpoints typically close to the timeout boundary? From what I
>> see, writing the checkpoint is relatively fast but the time from the
>> checkpoint trigger to execution seems very long. This is typically the case
>> if your job has a lot of backpressure and therefore the checkpoint barriers
>> take a long time to travel to the operators, because a lot of events are
>> piling up in the buffers. Do you also experience large alignments for your
>> checkpoints?
>>
>> Best,
>> Stefan
>>
>> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>>
>> Hi Stefan,
>>
>> It seems that I found something strange from JM's log.
>>
>> It had happened more than once before, but all subtasks would finish
>> their checkpoint attempts in the end.
>>
>> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1140 @ 1506389008690
>> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1141 @ 1506389308690
>> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1142 @ 1506389608690
>> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Checkpoint 1140 expired before completing.
>> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Checkpoint 1141 expired before completing.
>> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Received late message for now expired checkpoint attempt 1140 from
>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Received late message for now expired checkpoint attempt 1141 from
>> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
>> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>>
>> For chk #1245 and #1246, there was no late message from TM. You can refer
>> to the TM log. The full completed checkpoint attempt will have 12
>> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
>> logs.
>>
>> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1245 @ 1506420508690
>> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Triggering checkpoint 1246 @ 1506420808690
>> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Checkpoint 1245 expired before completing.
>> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Checkpoint 1246 expired before completing.
>>
>> Moreover, I listed the directory for checkpoints on S3 and saw there were
>> two states not discarded successfully. In general, there will be 16 parts
>> for a completed checkpoint state.
>>
>> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c0
>> 39572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-
>> bf0b-11cc1fc67ab8
>> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c0
>> 39572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-
>> 8509-5fea4ed25af6
>>
>> Hope these informations are helpful. Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>
>>> Hi,
>>>
>>> thanks for the information. Unfortunately, I have no immediate idea what
>>> the reason is from the given information. I think most helpful could be a
>>> thread dump, but also metrics on the operator operator level to figure out
>>> which part of the pipeline is the culprit.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>>
>>> Hi Stefan,
>>>
>>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>>> My job is roughly like this.
>>>
>>> env.addSource(Kafka)
>>>   .map(ParseKeyFromRecord)
>>>   .keyBy()
>>>   .process(CountAndTimeoutWindow)
>>>   .asyncIO(UploadToS3)
>>>   .addSink(UpdateDatabase)
>>>
>>> It seemed all tasks stopped like the picture I sent in the last email.
>>>
>>> I will keep my eye on taking a thread dump from that JVM if this happens
>>> again.
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>>
>>>> Hi,
>>>>
>>>> that is very strange indeed. I had a look at the logs and there is no
>>>> error or exception reported. I assume there is also no exception in your
>>>> full logs? Which version of flink are you using and what operators were
>>>> running in the task that stopped? If this happens again, would it be
>>>> possible to take a thread dump from that JVM?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Something weird happened on my streaming job.
>>>> >
>>>> > I found my streaming job seems to be blocked for a long time and I
>>>> saw the situation like the picture below. (chk #1245 and #1246 were all
>>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>>> with the same state like #1247 util I restarted TM.)
>>>> >
>>>> > <snapshot.png>
>>>> >
>>>> > I'm not sure what happened, but the consumer stopped fetching
>>>> records, buffer usage is 100% and the following task did not seem to fetch
>>>> data anymore. Just like the whole TM was stopped.
>>>> >
>>>> > However, after I restarted TM and force the job restarting from the
>>>> latest completed checkpoint, everything worked again. And I don't know how
>>>> to reproduce it.
>>>> >
>>>> > The attachment is my TM log. Because there are many user logs and
>>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>>> >
>>>> > My cluster setting is one JM and one TM with 4 available slots.
>>>> >
>>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>>> concurrent number is 3.
>>>> >
>>>> > Please let me know if it needs more information to find out what
>>>> happened on my streaming job. Thanks for your help.
>>>> >
>>>> > Best Regards,
>>>> > Tony Wei
>>>> > <flink-root-taskmanager-0-partial.log>
>>>>
>>>>
>>>
>>>
>>
>>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

Here is the summary for my streaming job's checkpoint after restarting at
last night.

[image: 內置圖片 1]

This is the distribution of alignment buffered from the last 12 hours.

[image: 內置圖片 3]

And here is the buffer out pool usage during chk #1140 ~ #1142. For chk
#1245 and #1246, you can check the picture I sent before.

 [image: 內置圖片 2]

AFAIK, the back pressure rate usually is in LOW status, sometimes goes up
to HIGH, and always OK during the night.

Best Regards,
Tony Wei


2017-09-27 16:54 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi Tony,
>
> are your checkpoints typically close to the timeout boundary? From what I
> see, writing the checkpoint is relatively fast but the time from the
> checkpoint trigger to execution seems very long. This is typically the case
> if your job has a lot of backpressure and therefore the checkpoint barriers
> take a long time to travel to the operators, because a lot of events are
> piling up in the buffers. Do you also experience large alignments for your
> checkpoints?
>
> Best,
> Stefan
>
> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
>
> Hi Stefan,
>
> It seems that I found something strange from JM's log.
>
> It had happened more than once before, but all subtasks would finish their
> checkpoint attempts in the end.
>
> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1140 @ 1506389008690
> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1141 @ 1506389308690
> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1142 @ 1506389608690
> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1140 expired before completing.
> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1141 expired before completing.
> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1140 from
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Received late message for now expired checkpoint attempt 1141 from
> c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
>
> For chk #1245 and #1246, there was no late message from TM. You can refer
> to the TM log. The full completed checkpoint attempt will have 12
> (... asynchronous part) logs in general, but #1245 and #1246 only got 10
> logs.
>
> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1245 @ 1506420508690
> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1246 @ 1506420808690
> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1245 expired before completing.
> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Checkpoint 1246 expired before completing.
>
> Moreover, I listed the directory for checkpoints on S3 and saw there were
> two states not discarded successfully. In general, there will be 16 parts
> for a completed checkpoint state.
>
> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/
> 7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-
> 45a5-bf0b-11cc1fc67ab8
> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/
> 7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-
> 465d-8509-5fea4ed25af6
>
> Hope these informations are helpful. Thank you.
>
> Best Regards,
> Tony Wei
>
> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>
>> Hi,
>>
>> thanks for the information. Unfortunately, I have no immediate idea what
>> the reason is from the given information. I think most helpful could be a
>> thread dump, but also metrics on the operator operator level to figure out
>> which part of the pipeline is the culprit.
>>
>> Best,
>> Stefan
>>
>> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>>
>> Hi Stefan,
>>
>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>> My job is roughly like this.
>>
>> env.addSource(Kafka)
>>   .map(ParseKeyFromRecord)
>>   .keyBy()
>>   .process(CountAndTimeoutWindow)
>>   .asyncIO(UploadToS3)
>>   .addSink(UpdateDatabase)
>>
>> It seemed all tasks stopped like the picture I sent in the last email.
>>
>> I will keep my eye on taking a thread dump from that JVM if this happens
>> again.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>>
>>> Hi,
>>>
>>> that is very strange indeed. I had a look at the logs and there is no
>>> error or exception reported. I assume there is also no exception in your
>>> full logs? Which version of flink are you using and what operators were
>>> running in the task that stopped? If this happens again, would it be
>>> possible to take a thread dump from that JVM?
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>>> >
>>> > Hi,
>>> >
>>> > Something weird happened on my streaming job.
>>> >
>>> > I found my streaming job seems to be blocked for a long time and I saw
>>> the situation like the picture below. (chk #1245 and #1246 were all
>>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>>> with the same state like #1247 util I restarted TM.)
>>> >
>>> > <snapshot.png>
>>> >
>>> > I'm not sure what happened, but the consumer stopped fetching records,
>>> buffer usage is 100% and the following task did not seem to fetch data
>>> anymore. Just like the whole TM was stopped.
>>> >
>>> > However, after I restarted TM and force the job restarting from the
>>> latest completed checkpoint, everything worked again. And I don't know how
>>> to reproduce it.
>>> >
>>> > The attachment is my TM log. Because there are many user logs and
>>> sensitive information, I only remain the log from `org.apache.flink...`.
>>> >
>>> > My cluster setting is one JM and one TM with 4 available slots.
>>> >
>>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>>> concurrent number is 3.
>>> >
>>> > Please let me know if it needs more information to find out what
>>> happened on my streaming job. Thanks for your help.
>>> >
>>> > Best Regards,
>>> > Tony Wei
>>> > <flink-root-taskmanager-0-partial.log>
>>>
>>>
>>
>>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi Tony,

are your checkpoints typically close to the timeout boundary? From what I see, writing the checkpoint is relatively fast but the time from the checkpoint trigger to execution seems very long. This is typically the case if your job has a lot of backpressure and therefore the checkpoint barriers take a long time to travel to the operators, because a lot of events are piling up in the buffers. Do you also experience large alignments for your checkpoints?

Best,
Stefan  

> Am 27.09.2017 um 10:43 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> It seems that I found something strange from JM's log.
> 
> It had happened more than once before, but all subtasks would finish their checkpoint attempts in the end.
> 
> 2017-09-26 01:23:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1140 @ 1506389008690
> 2017-09-26 01:28:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1141 @ 1506389308690
> 2017-09-26 01:33:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1142 @ 1506389608690
> 2017-09-26 01:33:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140 expired before completing.
> 2017-09-26 01:38:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141 expired before completing.
> 2017-09-26 01:40:38,044 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1140 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:40:53,743 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 1141 from c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
> 2017-09-26 01:41:19,332 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1142 (136733704 bytes in 457413 ms).
> 
> For chk #1245 and #1246, there was no late message from TM. You can refer to the TM log. The full completed checkpoint attempt will have 12 (... asynchronous part) logs in general, but #1245 and #1246 only got 10 logs.
> 
> 2017-09-26 10:08:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1245 @ 1506420508690
> 2017-09-26 10:13:28,690 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1246 @ 1506420808690
> 2017-09-26 10:18:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245 expired before completing.
> 2017-09-26 10:23:28,691 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246 expired before completing.
> 
> Moreover, I listed the directory for checkpoints on S3 and saw there were two states not discarded successfully. In general, there will be 16 parts for a completed checkpoint state.
> 
> 2017-09-26 18:08:33 36919 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
> 2017-09-26 18:13:34 37419 tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6
> 
> Hope these informations are helpful. Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-27 16:14 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.
> 
> Best,
> Stefan
> 
>> Am 26.09.2017 um 17:55 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> There is no unknown exception in my full log. The Flink version is 1.3.2.
>> My job is roughly like this.
>> 
>> env.addSource(Kafka)
>>   .map(ParseKeyFromRecord)
>>   .keyBy()
>>   .process(CountAndTimeoutWindow)
>>   .asyncIO(UploadToS3)
>>   .addSink(UpdateDatabase)
>> 
>> It seemed all tasks stopped like the picture I sent in the last email.
>> 
>> I will keep my eye on taking a thread dump from that JVM if this happens again.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
>> Hi,
>> 
>> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
>> 
>> Best,
>> Stefan
>> 
>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
>> >
>> > Hi,
>> >
>> > Something weird happened on my streaming job.
>> >
>> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
>> >
>> > <snapshot.png>
>> >
>> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
>> >
>> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
>> >
>> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
>> >
>> > My cluster setting is one JM and one TM with 4 available slots.
>> >
>> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
>> >
>> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
>> >
>> > Best Regards,
>> > Tony Wei
>> > <flink-root-taskmanager-0-partial.log>
>> 
>> 
> 
> 


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

It seems that I found something strange from JM's log.

It had happened more than once before, but all subtasks would finish their
checkpoint attempts in the end.

2017-09-26 01:23:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1140 @ 1506389008690
2017-09-26 01:28:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1141 @ 1506389308690
2017-09-26 01:33:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1142 @ 1506389608690
2017-09-26 01:33:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1140
expired before completing.
2017-09-26 01:38:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1141
expired before completing.
2017-09-26 01:40:38,044 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 1140 from
c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:40:53,743 WARN
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late
message for now expired checkpoint attempt 1141 from
c63825d15de0fef55a1d148adcf4467e of job 7c039572b...
2017-09-26 01:41:19,332 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 1142 (136733704 bytes in 457413 ms).

For chk #1245 and #1246, there was no late message from TM. You can refer
to the TM log. The full completed checkpoint attempt will have 12
(... asynchronous part) logs in general, but #1245 and #1246 only got 10
logs.

2017-09-26 10:08:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1245 @ 1506420508690
2017-09-26 10:13:28,690 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 1246 @ 1506420808690
2017-09-26 10:18:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1245
expired before completing.
2017-09-26 10:23:28,691 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 1246
expired before completing.

Moreover, I listed the directory for checkpoints on S3 and saw there were
two states not discarded successfully. In general, there will be 16 parts
for a completed checkpoint state.

2017-09-26 18:08:33 36919
tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1245/eedd7ca5-ee34-45a5-bf0b-11cc1fc67ab8
2017-09-26 18:13:34 37419
tony-dev/flink-checkpoints/7c039572b13346f1b17dcc0ace2b72c2/chk-1246/9aa5c6c4-8c74-465d-8509-5fea4ed25af6

Hope these informations are helpful. Thank you.

Best Regards,
Tony Wei

2017-09-27 16:14 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi,
>
> thanks for the information. Unfortunately, I have no immediate idea what
> the reason is from the given information. I think most helpful could be a
> thread dump, but also metrics on the operator operator level to figure out
> which part of the pipeline is the culprit.
>
> Best,
> Stefan
>
> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
>
> Hi Stefan,
>
> There is no unknown exception in my full log. The Flink version is 1.3.2.
> My job is roughly like this.
>
> env.addSource(Kafka)
>   .map(ParseKeyFromRecord)
>   .keyBy()
>   .process(CountAndTimeoutWindow)
>   .asyncIO(UploadToS3)
>   .addSink(UpdateDatabase)
>
> It seemed all tasks stopped like the picture I sent in the last email.
>
> I will keep my eye on taking a thread dump from that JVM if this happens
> again.
>
> Best Regards,
> Tony Wei
>
> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s....@data-artisans.com>:
>
>> Hi,
>>
>> that is very strange indeed. I had a look at the logs and there is no
>> error or exception reported. I assume there is also no exception in your
>> full logs? Which version of flink are you using and what operators were
>> running in the task that stopped? If this happens again, would it be
>> possible to take a thread dump from that JVM?
>>
>> Best,
>> Stefan
>>
>> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
>> >
>> > Hi,
>> >
>> > Something weird happened on my streaming job.
>> >
>> > I found my streaming job seems to be blocked for a long time and I saw
>> the situation like the picture below. (chk #1245 and #1246 were all
>> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
>> with the same state like #1247 util I restarted TM.)
>> >
>> > <snapshot.png>
>> >
>> > I'm not sure what happened, but the consumer stopped fetching records,
>> buffer usage is 100% and the following task did not seem to fetch data
>> anymore. Just like the whole TM was stopped.
>> >
>> > However, after I restarted TM and force the job restarting from the
>> latest completed checkpoint, everything worked again. And I don't know how
>> to reproduce it.
>> >
>> > The attachment is my TM log. Because there are many user logs and
>> sensitive information, I only remain the log from `org.apache.flink...`.
>> >
>> > My cluster setting is one JM and one TM with 4 available slots.
>> >
>> > Streaming job uses all slots, checkpoint interval is 5 mins and max
>> concurrent number is 3.
>> >
>> > Please let me know if it needs more information to find out what
>> happened on my streaming job. Thanks for your help.
>> >
>> > Best Regards,
>> > Tony Wei
>> > <flink-root-taskmanager-0-partial.log>
>>
>>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

thanks for the information. Unfortunately, I have no immediate idea what the reason is from the given information. I think most helpful could be a thread dump, but also metrics on the operator operator level to figure out which part of the pipeline is the culprit.

Best,
Stefan

> Am 26.09.2017 um 17:55 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi Stefan,
> 
> There is no unknown exception in my full log. The Flink version is 1.3.2.
> My job is roughly like this.
> 
> env.addSource(Kafka)
>   .map(ParseKeyFromRecord)
>   .keyBy()
>   .process(CountAndTimeoutWindow)
>   .asyncIO(UploadToS3)
>   .addSink(UpdateDatabase)
> 
> It seemed all tasks stopped like the picture I sent in the last email.
> 
> I will keep my eye on taking a thread dump from that JVM if this happens again.
> 
> Best Regards,
> Tony Wei
> 
> 2017-09-26 23:46 GMT+08:00 Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>>:
> Hi,
> 
> that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?
> 
> Best,
> Stefan
> 
> > Am 26.09.2017 um 17:08 schrieb Tony Wei <tony19920430@gmail.com <ma...@gmail.com>>:
> >
> > Hi,
> >
> > Something weird happened on my streaming job.
> >
> > I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
> >
> > <snapshot.png>
> >
> > I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
> >
> > However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
> >
> > The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
> >
> > My cluster setting is one JM and one TM with 4 available slots.
> >
> > Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
> >
> > Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
> >
> > Best Regards,
> > Tony Wei
> > <flink-root-taskmanager-0-partial.log>
> 
> 


Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Tony Wei <to...@gmail.com>.
Hi Stefan,

There is no unknown exception in my full log. The Flink version is 1.3.2.
My job is roughly like this.

env.addSource(Kafka)
  .map(ParseKeyFromRecord)
  .keyBy()
  .process(CountAndTimeoutWindow)
  .asyncIO(UploadToS3)
  .addSink(UpdateDatabase)

It seemed all tasks stopped like the picture I sent in the last email.

I will keep my eye on taking a thread dump from that JVM if this happens
again.

Best Regards,
Tony Wei

2017-09-26 23:46 GMT+08:00 Stefan Richter <s....@data-artisans.com>:

> Hi,
>
> that is very strange indeed. I had a look at the logs and there is no
> error or exception reported. I assume there is also no exception in your
> full logs? Which version of flink are you using and what operators were
> running in the task that stopped? If this happens again, would it be
> possible to take a thread dump from that JVM?
>
> Best,
> Stefan
>
> > Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
> >
> > Hi,
> >
> > Something weird happened on my streaming job.
> >
> > I found my streaming job seems to be blocked for a long time and I saw
> the situation like the picture below. (chk #1245 and #1246 were all
> finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed
> with the same state like #1247 util I restarted TM.)
> >
> > <snapshot.png>
> >
> > I'm not sure what happened, but the consumer stopped fetching records,
> buffer usage is 100% and the following task did not seem to fetch data
> anymore. Just like the whole TM was stopped.
> >
> > However, after I restarted TM and force the job restarting from the
> latest completed checkpoint, everything worked again. And I don't know how
> to reproduce it.
> >
> > The attachment is my TM log. Because there are many user logs and
> sensitive information, I only remain the log from `org.apache.flink...`.
> >
> > My cluster setting is one JM and one TM with 4 available slots.
> >
> > Streaming job uses all slots, checkpoint interval is 5 mins and max
> concurrent number is 3.
> >
> > Please let me know if it needs more information to find out what
> happened on my streaming job. Thanks for your help.
> >
> > Best Regards,
> > Tony Wei
> > <flink-root-taskmanager-0-partial.log>
>
>

Re: Stream Task seems to be blocked after checkpoint timeout

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

that is very strange indeed. I had a look at the logs and there is no error or exception reported. I assume there is also no exception in your full logs? Which version of flink are you using and what operators were running in the task that stopped? If this happens again, would it be possible to take a thread dump from that JVM?

Best,
Stefan

> Am 26.09.2017 um 17:08 schrieb Tony Wei <to...@gmail.com>:
> 
> Hi,
> 
> Something weird happened on my streaming job.
> 
> I found my streaming job seems to be blocked for a long time and I saw the situation like the picture below. (chk #1245 and #1246 were all finishing 7/8 tasks then marked timeout by JM. Other checkpoints failed with the same state like #1247 util I restarted TM.)
> 
> <snapshot.png>
> 
> I'm not sure what happened, but the consumer stopped fetching records, buffer usage is 100% and the following task did not seem to fetch data anymore. Just like the whole TM was stopped.
> 
> However, after I restarted TM and force the job restarting from the latest completed checkpoint, everything worked again. And I don't know how to reproduce it.
> 
> The attachment is my TM log. Because there are many user logs and sensitive information, I only remain the log from `org.apache.flink...`.
> 
> My cluster setting is one JM and one TM with 4 available slots.
> 
> Streaming job uses all slots, checkpoint interval is 5 mins and max concurrent number is 3.
> 
> Please let me know if it needs more information to find out what happened on my streaming job. Thanks for your help.
> 
> Best Regards,
> Tony Wei
> <flink-root-taskmanager-0-partial.log>