You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 徐战辉 <wa...@163.com> on 2022/05/12 07:36:58 UTC

回复: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)


        hi, Yuxia,   
        这边是想咨询下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

        目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 10000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://******:8020/flink/checkpoints
state.savepoints.dir: hdfs://****:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ......
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ......
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '3600000',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)
Best Regards

| |
Jerry Guo
|
|
wangyixuhongming@163.com
|
---- 回复的原邮件 ----
| 发件人 | yuxia<lu...@alumni.sjtu.edu.cn> |
| 发送日期 | 2022年5月12日 15:16 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "徐战辉" <wa...@163.com>
收件人: "user-zh" <us...@flink.apache.org>
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongming@163.com
|
---- 转发的原邮件 ----
| 发件人 | 徐战辉<wa...@163.com> |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |

回复:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

Posted by 徐战辉 <wa...@163.com>.
准备看些文档再尝试下,该问题撤回,多谢,
done.

在2022年05月12日 15:36,徐战辉 写道:


        hi, Yuxia,   
        这边是想咨询下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。

        目前有一份作业,开启checkpoint,  cancel 后重新启动,发现数据会丢失1小部分。




1. flink.conf


execution.checkpointing.interval: 10000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true

state.backend: filesystem
state.checkpoints.dir: hdfs://******:8020/flink/checkpoints
state.savepoints.dir: hdfs://****:8020/flink/savepoints


2. source table
CREATE TABLE source_kafka_nginxlog (
 ts BIGINT,
 ......
 pt AS PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog',
-- 有将flink 1.15针对的补丁(FLINK-24697)打上

 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 

 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'format'='json'
);


3. sink table



CREATE TABLE sink_kafka_nginxlog_statistic (
 ts BIGINT,
  ......
 clt_rq BIGINT not null
) WITH (
 'connector' = 'kafka',
 'topic' = 'nginxlog-statistic-flink',
 'sink.parallelism' = '20',
 'sink.delivery-guarantee' = 'exactly-once',
 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink',
 'properties.transaction.timeout.ms' = '3600000',
 'scan.startup.mode' = 'group-offsets',
 'properties.auto.offset.reset' = 'latest', 
 'properties.bootstrap.servers' = '***:9092',
 'properties.group.id' = 'zep',
 'value.format' = 'csv'
)
Best Regards

| |
Jerry Guo
|
|
wangyixuhongming@163.com
|
---- 回复的原邮件 ----
| 发件人 | yuxia<lu...@alumni.sjtu.edu.cn> |
| 发送日期 | 2022年5月12日 15:16 |
| 收件人 | user-zh<us...@flink.apache.org> |
| 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |
hi,可以解释一下具体是想咨询什么问题?

Best regards,
Yuxia

----- 原始邮件 -----
发件人: "徐战辉" <wa...@163.com>
收件人: "user-zh" <us...@flink.apache.org>
发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00
主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

| |
Jerry Guo
|
|
wangyixuhongming@163.com
|
---- 转发的原邮件 ----
| 发件人 | 徐战辉<wa...@163.com> |
| 发送日期 | 2022年5月12日 10:38 |
| 收件人 | user-zh@flink.apache.org<us...@flink.apache.org> |
| 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |