You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 曹武 <14...@163.com> on 2020/07/16 12:04:16 UTC

flink 1.11 checkpoint使用

我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
最大允许同时出现几个CheckPoint
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
最小得间隔时间
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
是否倾向于用CheckPoint做故障恢复
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); //
容忍多少次CheckPoint失败
        //Checkpoint文件清理策略
       
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //Checkpoint外部文件路径
        env.setStateBackend(new FsStateBackend(new
URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);
        String sourceDDL = String.format(
                "CREATE TABLE debezium_source (" +
                        " id INT NOT NULL," +
                        " name STRING," +
                        " description STRING," +
                        " weight Double" +
                        ") WITH (" +
                        " 'connector' = 'kafka-0.11'," +
                        " 'topic' = '%s'," +
                        " 'properties.bootstrap.servers' = '%s'," +
                        " 'scan.startup.mode' = 'group-offsets'," +
                        " 'format' = 'debezium-json'" +
                        ")", "ddd", " 172.22.20.206:9092");
        String sinkDDL = "CREATE TABLE sink (" +
                " id INT NOT NULL," +
                " name STRING," +
                " description STRING," +
                " weight Double," +
                " PRIMARY KEY (id,name, description,weight) NOT ENFORCED " +
                ") WITH (" +
                " 'connector' = 'jdbc'," +
                " 'url' =
'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
                " 'table-name' = 'products'," +
                " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
                " 'username'='DataPip'," +
                " 'password'='DataPip'" +
                ")";
        String dml = "INSERT INTO sink SELECT  id,name ,description, weight
FROM debezium_source GROUP BY id,name ,description, weight";
        tEnv.executeSql(sourceDDL);
        tEnv.executeSql(sinkDDL);
        tEnv.executeSql(dml);



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 checkpoint使用

Posted by 曹武 <14...@163.com>.
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
        at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
        at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)


godfrey he wrote
> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
> 
> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
> 
>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build();
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>>         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> 最大允许同时出现几个CheckPoint
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>> 最小得间隔时间
>>         env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> 是否倾向于用CheckPoint做故障恢复
>>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> //
>> 容忍多少次CheckPoint失败
>>         //Checkpoint文件清理策略
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>         //Checkpoint外部文件路径
>>         env.setStateBackend(new FsStateBackend(new
>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>>         String sourceDDL = String.format(
>>                 "CREATE TABLE debezium_source (" +
>>                         " id INT NOT NULL," +
>>                         " name STRING," +
>>                         " description STRING," +
>>                         " weight Double" +
>>                         ") WITH (" +
>>                         " 'connector' = 'kafka-0.11'," +
>>                         " 'topic' = '%s'," +
>>                         " 'properties.bootstrap.servers' = '%s'," +
>>                         " 'scan.startup.mode' = 'group-offsets'," +
>>                         " 'format' = 'debezium-json'" +
>>                         ")", "ddd", " 172.22.20.206:9092");
>>         String sinkDDL = "CREATE TABLE sink (" +
>>                 " id INT NOT NULL," +
>>                 " name STRING," +
>>                 " description STRING," +
>>                 " weight Double," +
>>                 " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
>> "
>> +
>>                 ") WITH (" +
>>                 " 'connector' = 'jdbc'," +
>>                 " 'url' =
>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>>                 " 'table-name' = 'products'," +
>>                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>>                 " 'username'='DataPip'," +
>>                 " 'password'='DataPip'" +
>>                 ")";
>>         String dml = "INSERT INTO sink SELECT  id,name ,description,
>> weight
>> FROM debezium_source GROUP BY id,name ,description, weight";
>>         tEnv.executeSql(sourceDDL);
>>         tEnv.executeSql(sinkDDL);
>>         tEnv.executeSql(dml);
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 checkpoint使用

Posted by Leonard Xu <xb...@gmail.com>.
Hi, 曹武

这是一个已知bug,这个在1.11.1和1.12.0里已经修复,

如果着急使用,可以自己编译下release-1.11分支。

祝好 
Leonard Xu

https://issues.apache.org/jira/browse/FLINK-18461 <https://issues.apache.org/jira/browse/FLINK-18461>

> 在 2020年7月17日,17:12,曹武 <14...@163.com> 写道:
> 
> 感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!
> 
> godfrey he wrote
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>> 
>> 曹武 <
> 
>> 14701319164@
> 
>>> 于2020年7月16日周四 下午9:30写道:
>> 
>>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>> 
>>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>                .useBlinkPlanner()
>>>                .inStreamingMode()
>>>                .build();
>>> 
>>>        StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>>>        env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>>>        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>>> 最大允许同时出现几个CheckPoint
>>>        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>>> 最小得间隔时间
>>>        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>>> //
>>> 是否倾向于用CheckPoint做故障恢复
>>>        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>>> //
>>> 容忍多少次CheckPoint失败
>>>        //Checkpoint文件清理策略
>>> 
>>> 
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>        //Checkpoint外部文件路径
>>>        env.setStateBackend(new FsStateBackend(new
>>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>>>        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>> settings);
>>>        String sourceDDL = String.format(
>>>                "CREATE TABLE debezium_source (" +
>>>                        " id INT NOT NULL," +
>>>                        " name STRING," +
>>>                        " description STRING," +
>>>                        " weight Double" +
>>>                        ") WITH (" +
>>>                        " 'connector' = 'kafka-0.11'," +
>>>                        " 'topic' = '%s'," +
>>>                        " 'properties.bootstrap.servers' = '%s'," +
>>>                        " 'scan.startup.mode' = 'group-offsets'," +
>>>                        " 'format' = 'debezium-json'" +
>>>                        ")", "ddd", " 172.22.20.206:9092");
>>>        String sinkDDL = "CREATE TABLE sink (" +
>>>                " id INT NOT NULL," +
>>>                " name STRING," +
>>>                " description STRING," +
>>>                " weight Double," +
>>>                " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
>>> "
>>> +
>>>                ") WITH (" +
>>>                " 'connector' = 'jdbc'," +
>>>                " 'url' =
>>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>>>                " 'table-name' = 'products'," +
>>>                " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>>>                " 'username'='DataPip'," +
>>>                " 'password'='DataPip'" +
>>>                ")";
>>>        String dml = "INSERT INTO sink SELECT  id,name ,description,
>>> weight
>>> FROM debezium_source GROUP BY id,name ,description, weight";
>>>        tEnv.executeSql(sourceDDL);
>>>        tEnv.executeSql(sinkDDL);
>>>        tEnv.executeSql(dml);
>>> 
>>> 
>>> 
>>> --
>>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/ <http://apache-flink.147419.n8.nabble.com/>

Re: flink 1.11 checkpoint使用

Posted by 曹武 <14...@163.com>.
感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!

godfrey he wrote
> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
> 
> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
> 
>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> 从checkpoint恢复以后,新来op=d的数据会删除失败
>> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>>
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build();
>>
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>>         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> 最大允许同时出现几个CheckPoint
>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
>> 最小得间隔时间
>>         env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> 是否倾向于用CheckPoint做故障恢复
>>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> //
>> 容忍多少次CheckPoint失败
>>         //Checkpoint文件清理策略
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>         //Checkpoint外部文件路径
>>         env.setStateBackend(new FsStateBackend(new
>> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> settings);
>>         String sourceDDL = String.format(
>>                 "CREATE TABLE debezium_source (" +
>>                         " id INT NOT NULL," +
>>                         " name STRING," +
>>                         " description STRING," +
>>                         " weight Double" +
>>                         ") WITH (" +
>>                         " 'connector' = 'kafka-0.11'," +
>>                         " 'topic' = '%s'," +
>>                         " 'properties.bootstrap.servers' = '%s'," +
>>                         " 'scan.startup.mode' = 'group-offsets'," +
>>                         " 'format' = 'debezium-json'" +
>>                         ")", "ddd", " 172.22.20.206:9092");
>>         String sinkDDL = "CREATE TABLE sink (" +
>>                 " id INT NOT NULL," +
>>                 " name STRING," +
>>                 " description STRING," +
>>                 " weight Double," +
>>                 " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
>> "
>> +
>>                 ") WITH (" +
>>                 " 'connector' = 'jdbc'," +
>>                 " 'url' =
>> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>>                 " 'table-name' = 'products'," +
>>                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>>                 " 'username'='DataPip'," +
>>                 " 'password'='DataPip'" +
>>                 ")";
>>         String dml = "INSERT INTO sink SELECT  id,name ,description,
>> weight
>> FROM debezium_source GROUP BY id,name ,description, weight";
>>         tEnv.executeSql(sourceDDL);
>>         tEnv.executeSql(sinkDDL);
>>         tEnv.executeSql(dml);
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 checkpoint使用

Posted by 曹武 <14...@163.com>.
如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
        at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
        at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
        at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)






Jark wrote
> Hi,
> 
> 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
> 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
> 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。
> 
> Best,
> Jark
> 
> On Thu, 16 Jul 2020 at 21:56, godfrey he &lt;

> godfreyhe@

> &gt; wrote:
> 
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>>
>> 曹武 <

> 14701319164@

>> 于2020年7月16日周四 下午9:30写道:
>>
>> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> > 从checkpoint恢复以后,新来op=d的数据会删除失败
>> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>> >
>> >
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> >                 .useBlinkPlanner()
>> >                 .inStreamingMode()
>> >                 .build();
>> >
>> >         StreamExecutionEnvironment env =
>> > StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> >         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>> >         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>> >         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> > 最大允许同时出现几个CheckPoint
>> >         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L);
>> //
>> > 最小得间隔时间
>> >         env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> > 是否倾向于用CheckPoint做故障恢复
>> >        
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> > //
>> > 容忍多少次CheckPoint失败
>> >         //Checkpoint文件清理策略
>> >
>> >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >         //Checkpoint外部文件路径
>> >         env.setStateBackend(new FsStateBackend(new
>> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>> >         StreamTableEnvironment tEnv =
>> StreamTableEnvironment.create(env,
>> > settings);
>> >         String sourceDDL = String.format(
>> >                 "CREATE TABLE debezium_source (" +
>> >                         " id INT NOT NULL," +
>> >                         " name STRING," +
>> >                         " description STRING," +
>> >                         " weight Double" +
>> >                         ") WITH (" +
>> >                         " 'connector' = 'kafka-0.11'," +
>> >                         " 'topic' = '%s'," +
>> >                         " 'properties.bootstrap.servers' = '%s'," +
>> >                         " 'scan.startup.mode' = 'group-offsets'," +
>> >                         " 'format' = 'debezium-json'" +
>> >                         ")", "ddd", " 172.22.20.206:9092");
>> >         String sinkDDL = "CREATE TABLE sink (" +
>> >                 " id INT NOT NULL," +
>> >                 " name STRING," +
>> >                 " description STRING," +
>> >                 " weight Double," +
>> >                 " PRIMARY KEY (id,name, description,weight) NOT
>> ENFORCED
>> "
>> > +
>> >                 ") WITH (" +
>> >                 " 'connector' = 'jdbc'," +
>> >                 " 'url' =
>> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>> >                 " 'table-name' = 'products'," +
>> >                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>> >                 " 'username'='DataPip'," +
>> >                 " 'password'='DataPip'" +
>> >                 ")";
>> >         String dml = "INSERT INTO sink SELECT  id,name ,description,
>> weight
>> > FROM debezium_source GROUP BY id,name ,description, weight";
>> >         tEnv.executeSql(sourceDDL);
>> >         tEnv.executeSql(sinkDDL);
>> >         tEnv.executeSql(dml);
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 checkpoint使用

Posted by Jark Wu <im...@gmail.com>.
Hi,

能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。

Best,
Jark

On Thu, 16 Jul 2020 at 21:56, godfrey he <go...@gmail.com> wrote:

> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <14...@163.com> 于2020年7月16日周四 下午9:30写道:
>
> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> > 从checkpoint恢复以后,新来op=d的数据会删除失败
> > 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
> >
> >
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> > 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >                 .useBlinkPlanner()
> >                 .inStreamingMode()
> >                 .build();
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> >         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
> >         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> > 最大允许同时出现几个CheckPoint
> >         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> > 最小得间隔时间
> >         env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
> //
> > 是否倾向于用CheckPoint做故障恢复
> >         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> > //
> > 容忍多少次CheckPoint失败
> >         //Checkpoint文件清理策略
> >
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >         //Checkpoint外部文件路径
> >         env.setStateBackend(new FsStateBackend(new
> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
> >         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> > settings);
> >         String sourceDDL = String.format(
> >                 "CREATE TABLE debezium_source (" +
> >                         " id INT NOT NULL," +
> >                         " name STRING," +
> >                         " description STRING," +
> >                         " weight Double" +
> >                         ") WITH (" +
> >                         " 'connector' = 'kafka-0.11'," +
> >                         " 'topic' = '%s'," +
> >                         " 'properties.bootstrap.servers' = '%s'," +
> >                         " 'scan.startup.mode' = 'group-offsets'," +
> >                         " 'format' = 'debezium-json'" +
> >                         ")", "ddd", " 172.22.20.206:9092");
> >         String sinkDDL = "CREATE TABLE sink (" +
> >                 " id INT NOT NULL," +
> >                 " name STRING," +
> >                 " description STRING," +
> >                 " weight Double," +
> >                 " PRIMARY KEY (id,name, description,weight) NOT ENFORCED
> "
> > +
> >                 ") WITH (" +
> >                 " 'connector' = 'jdbc'," +
> >                 " 'url' =
> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
> >                 " 'table-name' = 'products'," +
> >                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
> >                 " 'username'='DataPip'," +
> >                 " 'password'='DataPip'" +
> >                 ")";
> >         String dml = "INSERT INTO sink SELECT  id,name ,description,
> weight
> > FROM debezium_source GROUP BY id,name ,description, weight";
> >         tEnv.executeSql(sourceDDL);
> >         tEnv.executeSql(sinkDDL);
> >         tEnv.executeSql(dml);
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>

Re: flink 1.11 checkpoint使用

Posted by godfrey he <go...@gmail.com>.
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT  id,name ,description, weight FROM
debezium_source" 不能满足需求?

曹武 <14...@163.com> 于2020年7月16日周四 下午9:30写道:

> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命令:./bin/flink run -m yarn-cluster  /root/bigdata-flink-1.0.jar -s
>
> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
> 代码:   EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
> 最大允许同时出现几个CheckPoint
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L); //
> 最小得间隔时间
>         env.getCheckpointConfig().setPreferCheckpointForRecovery(true); //
> 是否倾向于用CheckPoint做故障恢复
>         env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
> //
> 容忍多少次CheckPoint失败
>         //Checkpoint文件清理策略
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>         //Checkpoint外部文件路径
>         env.setStateBackend(new FsStateBackend(new
> URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
> TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> settings);
>         String sourceDDL = String.format(
>                 "CREATE TABLE debezium_source (" +
>                         " id INT NOT NULL," +
>                         " name STRING," +
>                         " description STRING," +
>                         " weight Double" +
>                         ") WITH (" +
>                         " 'connector' = 'kafka-0.11'," +
>                         " 'topic' = '%s'," +
>                         " 'properties.bootstrap.servers' = '%s'," +
>                         " 'scan.startup.mode' = 'group-offsets'," +
>                         " 'format' = 'debezium-json'" +
>                         ")", "ddd", " 172.22.20.206:9092");
>         String sinkDDL = "CREATE TABLE sink (" +
>                 " id INT NOT NULL," +
>                 " name STRING," +
>                 " description STRING," +
>                 " weight Double," +
>                 " PRIMARY KEY (id,name, description,weight) NOT ENFORCED "
> +
>                 ") WITH (" +
>                 " 'connector' = 'jdbc'," +
>                 " 'url' =
> 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>                 " 'table-name' = 'products'," +
>                 " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>                 " 'username'='DataPip'," +
>                 " 'password'='DataPip'" +
>                 ")";
>         String dml = "INSERT INTO sink SELECT  id,name ,description, weight
> FROM debezium_source GROUP BY id,name ,description, weight";
>         tEnv.executeSql(sourceDDL);
>         tEnv.executeSql(sinkDDL);
>         tEnv.executeSql(dml);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>