You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 徐战辉 <wa...@163.com> on 2022/05/12 02:38:33 UTC

基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

         hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

        目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 10000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://******:8020/flink/checkpoints
state.savepoints.dir: hdfs://****:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ......
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ......
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '3600000',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest',
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)




| |
Jerry Guo
|
|
wangyixuhongming@163.com
|

Re:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

Posted by RS <ti...@163.com>.
Hi,
cancel的时候要加savepoint,然后启动的时候指定savepoint应该就不会丢数据了,直接cancel的话是可能丢数据的,
checkpoint的作用和你想到可能不一样,你再看看

Thx







在 2022-05-12 10:38:33,"徐战辉" <wa...@163.com> 写道:

         hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

        目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 10000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://******:8020/flink/checkpoints
state.savepoints.dir: hdfs://****:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ......
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ......
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '3600000',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest',
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)




| |
Jerry Guo
|
|
wangyixuhongming@163.com
|