You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by restart <ne...@outlook.com> on 2020/10/09 03:24:25 UTC

checkpoint失败导致watermark不更新问题

大家好,请假一个问题:
   场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小时的聚合数据源来自分钟的聚合输出,天类似)进行聚合后sink
es,但是经常会在job跑几个小时后,出现大量的checkpoint失败,同时watermark不更新的现象,中间多次调整checkpoint的相关参数,也参照了网上的相关分析(http://pangongsen.com/2018/04/25/Flink%E6%B0%B4%E4%BD%8D%E7%BA%BF%E4%B8%8D%E8%A7%A6%E5%8F%91%E7%9A%84%E5%87%A0%E7%82%B9%E6%80%BB%E7%BB%93/),
仍然没找到问题点,flink接触时间不长,有点束手无策了。
task manager log:
2020-09-30 15:03:19,708 INFO  org.apache.hadoop.hdfs.DFSClient                             
- Could not complete
/flink/state/error/5d895e1de420b44791b0850c23004b0e/chk-41/5991fd64-9d2c-4ac2-b08f-859efba879ee
retrying...
2020-09-30 15:13:25,236 WARN  org.apache.hadoop.hdfs.DataStreamer                          
- Slow waitForAckedSeqno took 38022ms (threshold=30000ms). File being
written:
/flink/state/infrastructure_dep/error/5d895e1de420b44791b0850c23004b0e/shared/57f53e0d-3d58-4a5a-893d-50667065d975,
block: BP-1864147273-172.20.3.102-1555051764064:blk_2307424648_1233776433,
Write pipeline datanodes:
[DatanodeInfoWithStorage[172.20.1.61:1004,DS-e84bae2e-224a-4826-8f65-3a4bd3bd481a,DISK],
DatanodeInfoWithStorage[172.20.1.27:1004,DS-c69e89a7-88cd-45f0-b261-57fe9fb1c7d8,DISK],
DatanodeInfoWithStorage[172.20.1.24:1004,DS-cd0a08e6-ca5c-4d56-8c9d-fdd331d0b270,DISK]].
checkpoint相关代码:
        RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend(stateBackendConfig.getCheckpointDataUri(), true);
       
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend((StateBackend)rocksDBStateBackend);
        env.enableCheckpointing(5*60000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(900000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
watermark使用的是BoundedOutOfOrdernessTimestampExtractor。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint失败导致watermark不更新问题

Posted by restart <ne...@outlook.com>.
感谢回复,关于反压问题,是有关注过的,一般checkpoint失败次数增多时,通过flink-web看到的反压确实都是high,因为有过这样的想法,就是那怕下游反压大,最终sink
es的通道是通的,理论上数据还是会流向下游的,只是快慢问题,watermark迟早会更新的,所以一直把注意力放在checkpoint这块上,我按你的建议,先解决下反压问题,在看下情况如何。
再次感谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: checkpoint失败导致watermark不更新问题

Posted by shizk233 <wa...@gmail.com>.
hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。

建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html


restart <ne...@outlook.com> 于2020年10月9日周五 上午11:48写道:

> 大家好,请假一个问题:
>    场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小时的聚合数据源来自分钟的聚合输出,天类似)进行聚合后sink
>
> es,但是经常会在job跑几个小时后,出现大量的checkpoint失败,同时watermark不更新的现象,中间多次调整checkpoint的相关参数,也参照了网上的相关分析(
> http://pangongsen.com/2018/04/25/Flink%E6%B0%B4%E4%BD%8D%E7%BA%BF%E4%B8%8D%E8%A7%A6%E5%8F%91%E7%9A%84%E5%87%A0%E7%82%B9%E6%80%BB%E7%BB%93/
> ),
> 仍然没找到问题点,flink接触时间不长,有点束手无策了。
> task manager log:
> 2020-09-30 15:03:19,708 INFO  org.apache.hadoop.hdfs.DFSClient
>
> - Could not complete
>
> /flink/state/error/5d895e1de420b44791b0850c23004b0e/chk-41/5991fd64-9d2c-4ac2-b08f-859efba879ee
> retrying...
> 2020-09-30 15:13:25,236 WARN  org.apache.hadoop.hdfs.DataStreamer
>
> - Slow waitForAckedSeqno took 38022ms (threshold=30000ms). File being
> written:
>
> /flink/state/infrastructure_dep/error/5d895e1de420b44791b0850c23004b0e/shared/57f53e0d-3d58-4a5a-893d-50667065d975,
> block: BP-1864147273-172.20.3.102-1555051764064:blk_2307424648_1233776433,
> Write pipeline datanodes:
> [DatanodeInfoWithStorage[172.20.1.61:1004
> ,DS-e84bae2e-224a-4826-8f65-3a4bd3bd481a,DISK],
> DatanodeInfoWithStorage[172.20.1.27:1004
> ,DS-c69e89a7-88cd-45f0-b261-57fe9fb1c7d8,DISK],
> DatanodeInfoWithStorage[172.20.1.24:1004
> ,DS-cd0a08e6-ca5c-4d56-8c9d-fdd331d0b270,DISK]].
> checkpoint相关代码:
>         RocksDBStateBackend rocksDBStateBackend = new
> RocksDBStateBackend(stateBackendConfig.getCheckpointDataUri(), true);
>
>
> rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStateBackend((StateBackend)rocksDBStateBackend);
>         env.enableCheckpointing(5*60000, CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setCheckpointTimeout(900000);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
>         env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> watermark使用的是BoundedOutOfOrdernessTimestampExtractor。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>