You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 左岩 <13...@163.com> on 2022/11/04 06:34:52 UTC

FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"     'connector' = 'mysql-cdc', " +
"     'server-id' = '5401-5404', " +
"     'scan.startup.mode' = 'latest-offset', " +
//                "     'scan.startup.mode' = 'earliest-offset', " +
"     'hostname' = '192.168.0.220', " +
"     'port' = '3306', " +
"     'username' = 'root', " +
"     'password' = 'root', " +
"     'database-name' = 'zy', " +
"     'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();


// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"  'connector' = 'jdbc', " +
"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
"  'table-name' = 't_stu2', " +
"  'username' = 'root', " +
"  'password' = 'root'  " +
")"
);

tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();

    }

Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

Posted by 左岩 <13...@163.com>.
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里

















在 2022-11-07 10:11:56,"Shengkai Fang" <fs...@gmail.com> 写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>>  跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> >    StatementSet statementSet = tenv.createStatementSet();
>> >    statementSet.addInsertSql(sql1);
>> >    statementSet.addInsertSql(sql2);
>> >    TableResult result = statementSet.execute();
>> >    result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>>  --------------------这个任务给去掉
>> >
>> >
>> >
>> >yinghua_zh@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >"      userid INT, " +
>> >"      username string, " +
>> >"      age string, " +
>> >"      `partition` INT, " +
>> >"     PRIMARY KEY(userid) NOT ENFORCED " +
>> >"     ) WITH ( " +
>> >"     'connector' = 'mysql-cdc', " +
>> >"     'server-id' = '5401-5404', " +
>> >"     'scan.startup.mode' = 'latest-offset', " +
>> >//                "     'scan.startup.mode' = 'earliest-offset', " +
>> >"     'hostname' = '192.168.0.220', " +
>> >"     'port' = '3306', " +
>> >"     'username' = 'root', " +
>> >"     'password' = 'root', " +
>> >"     'database-name' = 'zy', " +
>> >"     'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >"      userid INT, " +
>> >"      username string, " +
>> >"      age string, " +
>> >"      `partition` INT, " +
>> >"     PRIMARY KEY(userid) NOT ENFORCED " +
>> >"     ) WITH ( " +
>> >"  'connector' = 'jdbc', " +
>> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >"  'table-name' = 't_stu2', " +
>> >"  'username' = 'root', " +
>> >"  'password' = 'root'  " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> >    }
>>

Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

Posted by Shengkai Fang <fs...@gmail.com>.
你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。

Best,
Shengkai

左岩 <13...@163.com> 于2022年11月4日周五 17:58写道:

>
>
>
>
>
>
>
>
>
> .print(); 去掉也不行,
>  跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>
>
>
>
>
>
>
>
> 在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>
> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> >    StatementSet statementSet = tenv.createStatementSet();
> >    statementSet.addInsertSql(sql1);
> >    statementSet.addInsertSql(sql2);
> >    TableResult result = statementSet.execute();
> >    result.getJobClient().get().getJobID().toString();
> >
> >
> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
>  --------------------这个任务给去掉
> >
> >
> >
> >yinghua_zh@163.com
> >
> >发件人: 左岩
> >发送时间: 2022-11-04 14:34
> >收件人: user-zh
> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
> >代码如下:控制台打印情况见附件
> >public static void main(String[] args) throws Exception {
> >Configuration conf = new Configuration();
> >conf.setInteger("rest.port", 10041);
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >
> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> >
> >env.setParallelism(1);
> >// 建表
> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
> >"      userid INT, " +
> >"      username string, " +
> >"      age string, " +
> >"      `partition` INT, " +
> >"     PRIMARY KEY(userid) NOT ENFORCED " +
> >"     ) WITH ( " +
> >"     'connector' = 'mysql-cdc', " +
> >"     'server-id' = '5401-5404', " +
> >"     'scan.startup.mode' = 'latest-offset', " +
> >//                "     'scan.startup.mode' = 'earliest-offset', " +
> >"     'hostname' = '192.168.0.220', " +
> >"     'port' = '3306', " +
> >"     'username' = 'root', " +
> >"     'password' = 'root', " +
> >"     'database-name' = 'zy', " +
> >"     'table-name' = 't_stu' " +
> >")");
> >
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
> >
> >
> >// 建一个目标表,用来存放查询结果
> >tenv.executeSql(
> >"CREATE TABLE flink_t_stu2 ( " +
> >"      userid INT, " +
> >"      username string, " +
> >"      age string, " +
> >"      `partition` INT, " +
> >"     PRIMARY KEY(userid) NOT ENFORCED " +
> >"     ) WITH ( " +
> >"  'connector' = 'jdbc', " +
> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
> >"  'table-name' = 't_stu2', " +
> >"  'username' = 'root', " +
> >"  'password' = 'root'  " +
> >")"
> >);
> >
> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
> >"SELECT * FROM flink_t_stu");
> >env.execute();
> >
> >    }
>

Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

Posted by 左岩 <13...@163.com>.








.print(); 去掉也不行,   跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢








在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>    StatementSet statementSet = tenv.createStatementSet();
>    statementSet.addInsertSql(sql1);
>    statementSet.addInsertSql(sql2);
>    TableResult result = statementSet.execute();
>    result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();   --------------------这个任务给去掉
>
>
>
>yinghua_zh@163.com
> 
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>"      userid INT, " +
>"      username string, " +
>"      age string, " +
>"      `partition` INT, " +
>"     PRIMARY KEY(userid) NOT ENFORCED " +
>"     ) WITH ( " +
>"     'connector' = 'mysql-cdc', " +
>"     'server-id' = '5401-5404', " +
>"     'scan.startup.mode' = 'latest-offset', " +
>//                "     'scan.startup.mode' = 'earliest-offset', " +
>"     'hostname' = '192.168.0.220', " +
>"     'port' = '3306', " +
>"     'username' = 'root', " +
>"     'password' = 'root', " +
>"     'database-name' = 'zy', " +
>"     'table-name' = 't_stu' " +
>")");
> 
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
> 
> 
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>"      userid INT, " +
>"      username string, " +
>"      age string, " +
>"      `partition` INT, " +
>"     PRIMARY KEY(userid) NOT ENFORCED " +
>"     ) WITH ( " +
>"  'connector' = 'jdbc', " +
>"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>"  'table-name' = 't_stu2', " +
>"  'username' = 'root', " +
>"  'password' = 'root'  " +
>")"
>);
> 
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
> 
>    }

回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

Posted by "yinghua_zh@163.com" <yi...@163.com>.
你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
    StatementSet statementSet = tenv.createStatementSet();
    statementSet.addInsertSql(sql1);
    statementSet.addInsertSql(sql2);
    TableResult result = statementSet.execute();
    result.getJobClient().get().getJobID().toString();


或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
// 查询
tenv.executeSql("select * from flink_t_stu").print();   --------------------这个任务给去掉



yinghua_zh@163.com
 
发件人: 左岩
发送时间: 2022-11-04 14:34
收件人: user-zh
主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
 
env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
 
env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"     'connector' = 'mysql-cdc', " +
"     'server-id' = '5401-5404', " +
"     'scan.startup.mode' = 'latest-offset', " +
//                "     'scan.startup.mode' = 'earliest-offset', " +
"     'hostname' = '192.168.0.220', " +
"     'port' = '3306', " +
"     'username' = 'root', " +
"     'password' = 'root', " +
"     'database-name' = 'zy', " +
"     'table-name' = 't_stu' " +
")");
 
// 查询
tenv.executeSql("select * from flink_t_stu").print();
 
 
// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"  'connector' = 'jdbc', " +
"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
"  'table-name' = 't_stu2', " +
"  'username' = 'root', " +
"  'password' = 'root'  " +
")"
);
 
tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();
 
    }

Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

Posted by Leonard Xu <xb...@gmail.com>.

> On Nov 4, 2022, at 2:34 PM, 左岩 <13...@163.com> wrote:
> 
> tenv.executeSql("xxx);
> env.execute();


这样使用是不对的,你可以看下这两个方法的java doc

祝好,
Leonard