You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Tianwang Li <li...@gmail.com> on 2020/06/28 02:17:05 UTC

Flink-1.10.0 source的checkpoint偶尔时间比较长

关于Flink checkpoint偶尔会比较长时间的问题。

*环境与背景:*
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。

*问题:*
    大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。

checkpoint情况大致如下:

[image: image.png]
[image: image.png]
[image: image.png]

2020-06-24 21:09:53,369 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Trigger
checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.

2020-06-24 21:09:58,327 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:09:59,266 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:09:59,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:10:08,346 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:10:09,286 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:10:09,686 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]


省略。。。。


2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
Memory: 583911424

2020-06-24 21:55:39,875 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:55:39,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
-> Timestamps/Watermarks (4/10)

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Source: Custom Source -> Map -> Filter ->
Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
synchronous checkpoints for checkpoint 316 on task Source: Custom Source ->
Map -> Filter -> Timestamps/Watermarks (4/10)

2020-06-24 21:55:41,721 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot
duration 0 ms

2020-06-24 21:55:41,737 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms.

2020-06-24 21:55:41,738 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
asynchronous part of checkpoint 316. Asynchronous duration: 16 ms

2020-06-24 21:55:42,008 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.

2020-06-24 21:55:42,008 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
checkpoint 316.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
checkpoint 316 at 1593004193363.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
buffered data back.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
checkpoint (316) CHECKPOINT on task
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:42,110 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxxchk-316,
sharedStateDirectory=hdfs://xxxxshared,
taskOwnedStateDirectory=hdfs://xxxxtaskowned,
metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 0 ms.

2020-06-24 21:55:42,980 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxx/chk-316,
sharedStateDirectory=hdfs://xxxx/shared,
taskOwnedStateDirectory=hdfs://xxxx/taskowned,
metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
(10/10),5,Flink Task Threads] took 870 ms.

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
synchronous checkpoints for checkpoint 316 on task
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot
duration 870 ms

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes

2020-06-24 21:55:42,981 DEBUG
org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
(a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.

2020-06-24 21:55:43,814 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from e88ea2f790430c9c160e540ef0546d60.

2020-06-24 21:55:44,758 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
heartbeat request from b93d7167db364dfdcbda886944f1482f.

2020-06-24 21:55:45,714 DEBUG
org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
{fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
checkpointDirectory=hdfs://xxxx/chk-316,
sharedStateDirectory=hdfs://xxxx/shared,
taskOwnedStateDirectory=hdfs://xxxx/taskowned,
metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms.

2020-06-24 21:55:45,715 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask           -
Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424
MB (used/committed/max)]

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used
Memory: 584128353

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Off-heap pool stats: [Code Cache: 35/35/240 MB
(used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
[Compressed Class Space: 8/9/88 MB (used/committed/max)]

2020-06-24 21:55:49,876 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner
      - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]


-- 
**************************************
 tivanli
**************************************

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by Tianwang Li <li...@gmail.com>.
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
我增加每个task处理窗口数据的时间在观察一下,
我这个是测试任务,没有sink输出。
source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右)

LakeShen <sh...@gmail.com> 于2020年6月28日周日 上午10:35写道:

> Hi Tianwang Li,
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
> Best,
> LakeShen
>
> zhisheng <zh...@gmail.com> 于2020年6月28日周日 上午10:27写道:
>
> > hi, Tianwang Li
> >
> > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
> >
> > > 任务经常会出现反压(特别是在窗口输出的时候)
> >
> > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
> >
> >
> > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
> >
> > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
> >
> > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
> >
> >
> > Best !
> > zhisheng
> >
> >
> > Tianwang Li <li...@gmail.com> 于2020年6月28日周日 上午10:17写道:
> >
> > > 关于Flink checkpoint偶尔会比较长时间的问题。
> > >
> > > *环境与背景:*
> > > 版本:flink1.10.0
> > > 数据量:每秒约10万左右的记录,数据源是kafka
> > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> > >
> > > *问题:*
> > >     大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> > checkpoint消耗时间比较长。
> > >
> > > checkpoint情况大致如下:
> > >
> > > [image: image.png]
> > > [image: image.png]
> > > [image: image.png]
> > >
> > > 2020-06-24 21:09:53,369 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Trigger
> > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> > >
> > > 2020-06-24 21:09:58,327 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:09:59,266 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:10:08,346 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:10:09,286 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > >
> > > 省略。。。。
> > >
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:55:39,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Starting
> > > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map ->
> > Filter
> > > -> Timestamps/Watermarks (4/10)
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Source: Custom Source -> Map -> Filter ->
> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Source: Custom Source -> Map -> Filter ->
> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Source: Custom Source -> Map -> Filter ->
> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Source: Custom Source -> Map -> Filter ->
> > > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Finished
> > > synchronous checkpoints for checkpoint 316 on task Source: Custom
> Source
> > ->
> > > Map -> Filter -> Timestamps/Watermarks (4/10)
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) -
> finished
> > > synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot
> > > duration 0 ms
> > >
> > > 2020-06-24 21:55:41,737 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part)
> in
> > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16
> > ms.
> > >
> > > 2020-06-24 21:55:41,738 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> > > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) -
> finished
> > > asynchronous part of checkpoint 316. Asynchronous duration: 16 ms
> > >
> > > 2020-06-24 21:55:42,008 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.
> > >
> > > 2020-06-24 21:55:42,008 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
> > > checkpoint 316.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
> > > checkpoint 316 at 1593004193363.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
> > > buffered data back.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Starting
> > > checkpoint (316) CHECKPOINT on task
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > > (10/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:42,110 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> > ,
> > > checkpointDirectory=hdfs://xxxxchk-316,
> > > sharedStateDirectory=hdfs://xxxxshared,
> > > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > > (10/10),5,Flink Task Threads] took 0 ms.
> > >
> > > 2020-06-24 21:55:42,980 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> > ,
> > > checkpointDirectory=hdfs://xxxx/chk-316,
> > > sharedStateDirectory=hdfs://xxxx/shared,
> > > taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part)
> in
> > > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > > (10/10),5,Flink Task Threads] took 870 ms.
> > >
> > > 2020-06-24 21:55:42,981 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Finished
> > > synchronous checkpoints for checkpoint 316 on task
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > >
> > > 2020-06-24 21:55:42,981 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> > > synchronous part of checkpoint 316. Alignment duration: 101 ms,
> snapshot
> > > duration 870 ms
> > >
> > > 2020-06-24 21:55:42,981 DEBUG
> > > org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes
> > >
> > > 2020-06-24 21:55:42,981 DEBUG
> > > org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered
> data.
> > >
> > > 2020-06-24 21:55:43,814 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:55:44,758 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor            -
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:55:45,714 DEBUG
> > > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> > >
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> > ,
> > > checkpointDirectory=hdfs://xxxx/chk-316,
> > > sharedStateDirectory=hdfs://xxxx/shared,
> > > taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> > > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> > > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part)
> in
> > > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took
> 2733
> > ms.
> > >
> > > 2020-06-24 21:55:45,715 DEBUG
> > > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> > > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> > > asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms
> > >
> > > 2020-06-24 21:55:49,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:55:49,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Direct memory stats: Count: 17414, Total Capacity: 584128352,
> > > Used Memory: 584128353
> > >
> > > 2020-06-24 21:55:49,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:55:49,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
> > > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > >
> > > --
> > > **************************************
> > >  tivanli
> > > **************************************
> > >
> >
>


-- 
**************************************
 tivanli
**************************************

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by Tianwang Li <li...@gmail.com>.
我补充一下,checkpoint的UI截图如下:


https://imgchr.com/i/NgCUgS

https://imgchr.com/i/NgChDJ

https://imgchr.com/i/NgCT4x



>

-- 
**************************************
 tivanli
**************************************

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by LakeShen <sh...@gmail.com>.
Hi Tianwang Li,

偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。

Best,
LakeShen

zhisheng <zh...@gmail.com> 于2020年6月28日周日 上午10:27写道:

> hi, Tianwang Li
>
> 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
>
> > 任务经常会出现反压(特别是在窗口输出的时候)
>
> 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
>
>
> > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
>
> 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
>
> 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
>
>
> Best !
> zhisheng
>
>
> Tianwang Li <li...@gmail.com> 于2020年6月28日周日 上午10:17写道:
>
> > 关于Flink checkpoint偶尔会比较长时间的问题。
> >
> > *环境与背景:*
> > 版本:flink1.10.0
> > 数据量:每秒约10万左右的记录,数据源是kafka
> > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> >
> > *问题:*
> >     大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> checkpoint消耗时间比较长。
> >
> > checkpoint情况大致如下:
> >
> > [image: image.png]
> > [image: image.png]
> > [image: image.png]
> >
> > 2020-06-24 21:09:53,369 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Trigger
> > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> >
> > 2020-06-24 21:09:58,327 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:09:59,266 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:09:59,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> > 2020-06-24 21:10:08,346 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:10:09,286 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:10:09,686 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> >
> > 省略。。。。
> >
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > Used Memory: 583911424
> >
> > 2020-06-24 21:55:39,875 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:55:39,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
> > checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map ->
> Filter
> > -> Timestamps/Watermarks (4/10)
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Source: Custom Source -> Map -> Filter ->
> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Source: Custom Source -> Map -> Filter ->
> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Source: Custom Source -> Map -> Filter ->
> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Source: Custom Source -> Map -> Filter ->
> > Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
> > synchronous checkpoints for checkpoint 316 on task Source: Custom Source
> ->
> > Map -> Filter -> Timestamps/Watermarks (4/10)
> >
> > 2020-06-24 21:55:41,721 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
> > synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot
> > duration 0 ms
> >
> > 2020-06-24 21:55:41,737 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16
> ms.
> >
> > 2020-06-24 21:55:41,738 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> > Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
> > asynchronous part of checkpoint 316. Asynchronous duration: 16 ms
> >
> > 2020-06-24 21:55:42,008 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.
> >
> > 2020-06-24 21:55:42,008 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
> > checkpoint 316.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
> > checkpoint 316 at 1593004193363.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
> > buffered data back.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
> > checkpoint (316) CHECKPOINT on task
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > (10/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:42,110 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> ,
> > checkpointDirectory=hdfs://xxxxchk-316,
> > sharedStateDirectory=hdfs://xxxxshared,
> > taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> > metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > (10/10),5,Flink Task Threads] took 0 ms.
> >
> > 2020-06-24 21:55:42,980 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> ,
> > checkpointDirectory=hdfs://xxxx/chk-316,
> > sharedStateDirectory=hdfs://xxxx/shared,
> > taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> > thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> > EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> > (10/10),5,Flink Task Threads] took 870 ms.
> >
> > 2020-06-24 21:55:42,981 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
> > synchronous checkpoints for checkpoint 316 on task
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> >
> > 2020-06-24 21:55:42,981 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> > synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot
> > duration 870 ms
> >
> > 2020-06-24 21:55:42,981 DEBUG
> > org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes
> >
> > 2020-06-24 21:55:42,981 DEBUG
> > org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> > (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.
> >
> > 2020-06-24 21:55:43,814 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> >
> > 2020-06-24 21:55:44,758 DEBUG
> > org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> >
> > 2020-06-24 21:55:45,714 DEBUG
> > org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> > Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> > {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119
> ,
> > checkpointDirectory=hdfs://xxxx/chk-316,
> > sharedStateDirectory=hdfs://xxxx/shared,
> > taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> > metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> > fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> > thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733
> ms.
> >
> > 2020-06-24 21:55:45,715 DEBUG
> > org.apache.flink.streaming.runtime.tasks.StreamTask           -
> > Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> > anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> > asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms
> >
> > 2020-06-24 21:55:49,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP:
> > 111/114/424 MB (used/committed/max)]
> >
> > 2020-06-24 21:55:49,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Direct memory stats: Count: 17414, Total Capacity: 584128352,
> > Used Memory: 584128353
> >
> > 2020-06-24 21:55:49,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> >
> > 2020-06-24 21:55:49,876 INFO
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> >       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
> > COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> >
> >
> > --
> > **************************************
> >  tivanli
> > **************************************
> >
>

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by zhisheng <zh...@gmail.com>.
hi, Tianwang Li

看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:

> 任务经常会出现反压(特别是在窗口输出的时候)

这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。


> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)

这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象

另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。


Best !
zhisheng


Tianwang Li <li...@gmail.com> 于2020年6月28日周日 上午10:17写道:

> 关于Flink checkpoint偶尔会比较长时间的问题。
>
> *环境与背景:*
> 版本:flink1.10.0
> 数据量:每秒约10万左右的记录,数据源是kafka
> 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
>
> *问题:*
>     大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。
>
> checkpoint情况大致如下:
>
> [image: image.png]
> [image: image.png]
> [image: image.png]
>
> 2020-06-24 21:09:53,369 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Trigger
> checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
>
> 2020-06-24 21:09:58,327 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:09:59,266 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:09:59,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:09:59,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:10:08,346 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:10:09,286 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:10:09,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:10:09,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
> 省略。。。。
>
>
> 2020-06-24 21:55:39,875 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> Used Memory: 583911424
>
> 2020-06-24 21:55:39,875 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:39,876 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
> checkpoint (316) CHECKPOINT on task Source: Custom Source -> Map -> Filter
> -> Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -> Map -> Filter ->
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -> Map -> Filter ->
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -> Map -> Filter ->
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -> Map -> Filter ->
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
> synchronous checkpoints for checkpoint 316 on task Source: Custom Source ->
> Map -> Filter -> Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
> synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot
> duration 0 ms
>
> 2020-06-24 21:55:41,737 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms.
>
> 2020-06-24 21:55:41,738 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Source:
> Custom Source -> Map -> Filter -> Timestamps/Watermarks (4/10) - finished
> asynchronous part of checkpoint 316. Asynchronous duration: 16 ms
>
> 2020-06-24 21:55:42,008 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.
>
> 2020-06-24 21:55:42,008 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
> checkpoint 316.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
> checkpoint 316 at 1593004193363.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner  -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
> buffered data back.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Starting
> checkpoint (316) CHECKPOINT on task
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> (10/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> (10/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:42,980 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxx/chk-316,
> sharedStateDirectory=hdfs://xxxx/shared,
> taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -> Map
> (10/10),5,Flink Task Threads] took 870 ms.
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Finished
> synchronous checkpoints for checkpoint 316 on task
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot
> duration 870 ms
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.io.CachedBufferStorage     -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.
>
> 2020-06-24 21:55:43,814 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:55:44,758 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:55:45,714 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy       -
> Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxx/chk-316,
> sharedStateDirectory=hdfs://xxxx/shared,
> taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms.
>
> 2020-06-24 21:55:45,715 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -> Map (10/10) - finished
> asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms
>
> 2020-06-24 21:55:49,876 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP:
> 111/114/424 MB (used/committed/max)]
>
> 2020-06-24 21:55:49,876 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Direct memory stats: Count: 17414, Total Capacity: 584128352,
> Used Memory: 584128353
>
> 2020-06-24 21:55:49,876 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:49,876 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>       - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
> COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
> --
> **************************************
>  tivanli
> **************************************
>

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by Tianwang Li <li...@gmail.com>.
>
> 偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)

找到原因了, 任务处理延迟比较大,kafka数据过期清理了,导致从last消费(watermark一下子增长了好多个小时),
然后,这个时候需要输出几个小时内的所有窗口(平时一次只输出一个窗口,这时一次要输出30个窗口消耗比较长时间)。

因为是稳定测试任务,没有关注kafka 延迟 导致数据过期到问题。

感谢,zhisheng、LakeShen、Yichao Yang。


Yichao Yang <10...@qq.com> 于2020年6月29日周一 下午7:58写道:

> Hi
>
>
>
> 如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Tianwang Li"<litianwang@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月28日(星期天) 上午10:17
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Flink-1.10.0 source的checkpoint偶尔时间比较长
>
>
>
> 关于Flink checkpoint偶尔会比较长时间的问题。
> 环境与背景:
> 版本:flink1.10.0
> 数据量:每秒约10万左右的记录,数据源是kafka
> 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
>
>
> 问题:
> &nbsp; &nbsp;
> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> source的checkpoint消耗的时间比较长。Trigger&nbsp;checkpoint 到&nbsp;Starting
> checkpoint消耗时间比较长。
>
>
> checkpoint情况大致如下:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 2020-06-24 21:09:53,369 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Trigger checkpoint 316@1593004193363 for
> 84dce1ec8aa5a4df2d1758d6e9278693.
>
> 2020-06-24 21:09:58,327 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:09:59,266 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:09:59,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424
> MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:09:59,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:09:59,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:10:08,346 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:10:09,286 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:10:09,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424
> MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:10:09,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:10:09,686 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
>
>
> 省略。。。。
>
>
>
>
> 2020-06-24 21:55:39,875 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used
> Memory: 583911424
>
> 2020-06-24 21:55:39,875 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:39,876 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Starting checkpoint (316) CHECKPOINT on task Source: Custom
> Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Finished synchronous checkpoints for checkpoint 316 on task
> Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks
> (4/10)
>
> 2020-06-24 21:55:41,721 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10) - finished synchronous part of checkpoint 316.
> Alignment duration: 0 ms, snapshot duration 0 ms
>
> 2020-06-24 21:55:41,737 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms.
>
> 2020-06-24 21:55:41,738 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Source: Custom Source -&gt; Map -&gt; Filter -&gt;
> Timestamps/Watermarks (4/10) - finished asynchronous part of checkpoint
> 316. Asynchronous duration: 16 ms
>
> 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.
>
> 2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for
> checkpoint 316.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering
> checkpoint 316 at 1593004193363.
>
> 2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp;
> - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding
> buffered data back.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Starting checkpoint (316) CHECKPOINT on task
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map
> (10/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:42,110 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxxchk-316,
> sharedStateDirectory=hdfs://xxxxshared,
> taskOwnedStateDirectory=hdfs://xxxxtaskowned,
> metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map
> (10/10),5,Flink Task Threads] took 0 ms.
>
> 2020-06-24 21:55:42,980 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - Asynchronous incremental RocksDB snapshot
> (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxx/chk-316,
> sharedStateDirectory=hdfs://xxxx/shared,
> taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in
> thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map
> (10/10),5,Flink Task Threads] took 870 ms.
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Finished synchronous checkpoints for checkpoint 316 on task
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map
> (10/10) - finished synchronous part of checkpoint 316. Alignment duration:
> 101 ms, snapshot duration 870 ms
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.io.CachedBufferStorage &nbsp; &nbsp; -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes
>
> 2020-06-24 21:55:42,981 DEBUG
> org.apache.flink.streaming.runtime.io.CachedBufferStorage &nbsp; &nbsp; -
> Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger,
> anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
> (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.
>
> 2020-06-24 21:55:43,814 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> e88ea2f790430c9c160e540ef0546d60.
>
> 2020-06-24 21:55:44,758 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; &nbsp; - Received heartbeat request from
> b93d7167db364dfdcbda886944f1482f.
>
> 2020-06-24 21:55:45,714 DEBUG
> org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp;
> &nbsp; - Asynchronous incremental RocksDB snapshot
> (FsCheckpointStorageLocation
> {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119,
> checkpointDirectory=hdfs://xxxx/chk-316,
> sharedStateDirectory=hdfs://xxxx/shared,
> taskOwnedStateDirectory=hdfs://xxxx/taskowned,
> metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default),
> fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in
> thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms.
>
> 2020-06-24 21:55:45,715 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp;
> &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0),
> EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map
> (10/10) - finished asynchronous part of checkpoint 316. Asynchronous
> duration: 2734 ms
>
> 2020-06-24 21:55:49,876 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424
> MB (used/committed/max)]
>
> 2020-06-24 21:55:49,876 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used
> Memory: 584128353
>
> 2020-06-24 21:55:49,876 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB
> (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> [Compressed Class Space: 8/9/88 MB (used/committed/max)]
>
> 2020-06-24 21:55:49,876 INFO&nbsp;
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp;
> &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC
> COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
>
>
>
>
>
> --
> **************************************
> &nbsp;tivanli
> **************************************



-- 
**************************************
 tivanli
**************************************

回复:Flink-1.10.0 source的checkpoint偶尔时间比较长

Posted by Yichao Yang <10...@qq.com>.
Hi


如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Tianwang Li"<litianwang@gmail.com&gt;;
发送时间:&nbsp;2020年6月28日(星期天) 上午10:17
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Flink-1.10.0 source的checkpoint偶尔时间比较长



关于Flink checkpoint偶尔会比较长时间的问题。
环境与背景:
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。


问题:
&nbsp; &nbsp; 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger&nbsp;checkpoint 到&nbsp;Starting checkpoint消耗时间比较长。


checkpoint情况大致如下:













 
2020-06-24 21:09:53,369 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Trigger checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
 
2020-06-24 21:09:58,327 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60.
 
2020-06-24 21:09:59,266 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from b93d7167db364dfdcbda886944f1482f.
 
2020-06-24 21:09:59,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)]
 
2020-06-24 21:09:59,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424
 
2020-06-24 21:09:59,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)]
 
2020-06-24 21:09:59,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
 
2020-06-24 21:10:08,346 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60.
 
2020-06-24 21:10:09,286 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from b93d7167db364dfdcbda886944f1482f.
 
2020-06-24 21:10:09,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP: 111/114/424 MB (used/committed/max)]
 
2020-06-24 21:10:09,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424
 
2020-06-24 21:10:09,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)]
 
2020-06-24 21:10:09,686 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
 


 
省略。。。。
 


 
2020-06-24 21:55:39,875 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Direct memory stats: Count: 17403, Total Capacity: 583911423, Used Memory: 583911424
 
2020-06-24 21:55:39,875 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)]
 
2020-06-24 21:55:39,876 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Starting checkpoint (316) CHECKPOINT on task Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10)
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Finished synchronous checkpoints for checkpoint 316 on task Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10)
 
2020-06-24 21:55:41,721 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10) - finished synchronous part of checkpoint 316. Alignment duration: 0 ms, snapshot duration 0 ms
 
2020-06-24 21:55:41,737 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@4a4b11cd, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 16 ms.
 
2020-06-24 21:55:41,738 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Source: Custom Source -&gt; Map -&gt; Filter -&gt; Timestamps/Watermarks (4/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 16 ms
 
2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 3.
 
2020-06-24 21:55:42,008 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Starting stream alignment for checkpoint 316.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 7.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 2.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 8.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 6.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 0.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 5.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 9.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 4.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received barrier from channel 1.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Received all barriers, triggering checkpoint 316 at 1593004193363.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner&nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): End of stream alignment, feeding buffered data back.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Starting checkpoint (316) CHECKPOINT on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:42,110 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - DefaultOperatorStateBackend snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxxchk-316, sharedStateDirectory=hdfs://xxxxshared, taskOwnedStateDirectory=hdfs://xxxxtaskowned, metadataFilePath=hdfs://xxxxchk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10),5,Flink Task Threads] took 0 ms.
 
2020-06-24 21:55:42,980 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, synchronous part) in thread Thread[Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10),5,Flink Task Threads] took 870 ms.
 
2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Finished synchronous checkpoints for checkpoint 316 on task Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10)
 
2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) - finished synchronous part of checkpoint 316. Alignment duration: 101 ms, snapshot duration 870 ms
 
2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Size of buffered data: 98304 bytes
 
2020-06-24 21:55:42,981 DEBUG org.apache.flink.streaming.runtime.io.CachedBufferStorage &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) (a608e8a1ab75622dc57f164bdbb86743): Finished feeding back buffered data.
 
2020-06-24 21:55:43,814 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from e88ea2f790430c9c160e540ef0546d60.
 
2020-06-24 21:55:44,758 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Received heartbeat request from b93d7167db364dfdcbda886944f1482f.
 
2020-06-24 21:55:45,714 DEBUG org.apache.flink.runtime.state.AbstractSnapshotStrategy &nbsp; &nbsp; &nbsp; - Asynchronous incremental RocksDB snapshot (FsCheckpointStorageLocation {fileSystem=org.apache.flink.core.fs.SafetyNetWrapperFileSystem@61164119, checkpointDirectory=hdfs://xxxx/chk-316, sharedStateDirectory=hdfs://xxxx/shared, taskOwnedStateDirectory=hdfs://xxxx/taskowned, metadataFilePath=hdfs://xxxx/chk-316/_metadata, reference=(default), fileStateSizeThreshold=1024, writeBufferSize=4096}, asynchronous part) in thread Thread[AsyncOperations-thread-316,5,Flink Task Threads] took 2733 ms.
 
2020-06-24 21:55:45,715 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; - Window(SlidingEventTimeWindows(10800000, 300000, 0), EventTimeTrigger, anon$11, ScalaProcessWindowFunctionWrapper) -&gt; Map (10/10) - finished asynchronous part of checkpoint 316. Asynchronous duration: 2734 ms
 
2020-06-24 21:55:49,876 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Memory usage stats: [HEAP: 220/2589/2589 MB, NON HEAP: 111/114/424 MB (used/committed/max)]
 
2020-06-24 21:55:49,876 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Direct memory stats: Count: 17414, Total Capacity: 584128352, Used Memory: 584128353
 
2020-06-24 21:55:49,876 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Off-heap pool stats: [Code Cache: 35/35/240 MB (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)], [Compressed Class Space: 8/9/88 MB (used/committed/max)]
 
2020-06-24 21:55:49,876 INFO&nbsp; org.apache.flink.runtime.taskexecutor.TaskManagerRunner &nbsp; &nbsp; &nbsp; - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110525, GC COUNT: 7084], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]





-- 
**************************************
&nbsp;tivanli
**************************************