You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 左岩 <13...@163.com> on 2022/11/04 06:34:52 UTC
FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
" userid INT, " +
" username string, " +
" age string, " +
" `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'scan.startup.mode' = 'latest-offset', " +
// " 'scan.startup.mode' = 'earliest-offset', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");
// 查询
tenv.executeSql("select * from flink_t_stu").print();
// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
" userid INT, " +
" username string, " +
" age string, " +
" `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
" 'table-name' = 't_stu2', " +
" 'username' = 'root', " +
" 'password' = 'root' " +
")"
);
tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();
}
Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
Posted by 左岩 <13...@163.com>.
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里
在 2022-11-07 10:11:56,"Shengkai Fang" <fs...@gmail.com> 写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> > StatementSet statementSet = tenv.createStatementSet();
>> > statementSet.addInsertSql(sql1);
>> > statementSet.addInsertSql(sql2);
>> > TableResult result = statementSet.execute();
>> > result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> --------------------这个任务给去掉
>> >
>> >
>> >
>> >yinghua_zh@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >" userid INT, " +
>> >" username string, " +
>> >" age string, " +
>> >" `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >" 'connector' = 'mysql-cdc', " +
>> >" 'server-id' = '5401-5404', " +
>> >" 'scan.startup.mode' = 'latest-offset', " +
>> >// " 'scan.startup.mode' = 'earliest-offset', " +
>> >" 'hostname' = '192.168.0.220', " +
>> >" 'port' = '3306', " +
>> >" 'username' = 'root', " +
>> >" 'password' = 'root', " +
>> >" 'database-name' = 'zy', " +
>> >" 'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >" userid INT, " +
>> >" username string, " +
>> >" age string, " +
>> >" `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >" 'connector' = 'jdbc', " +
>> >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >" 'table-name' = 't_stu2', " +
>> >" 'username' = 'root', " +
>> >" 'password' = 'root' " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> > }
>>
Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
Posted by Shengkai Fang <fs...@gmail.com>.
你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
Best,
Shengkai
左岩 <13...@163.com> 于2022年11月4日周五 17:58写道:
>
>
>
>
>
>
>
>
>
> .print(); 去掉也不行,
> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>
>
>
>
>
>
>
>
> 在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>
> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> > StatementSet statementSet = tenv.createStatementSet();
> > statementSet.addInsertSql(sql1);
> > statementSet.addInsertSql(sql2);
> > TableResult result = statementSet.execute();
> > result.getJobClient().get().getJobID().toString();
> >
> >
> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
> --------------------这个任务给去掉
> >
> >
> >
> >yinghua_zh@163.com
> >
> >发件人: 左岩
> >发送时间: 2022-11-04 14:34
> >收件人: user-zh
> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
> >代码如下:控制台打印情况见附件
> >public static void main(String[] args) throws Exception {
> >Configuration conf = new Configuration();
> >conf.setInteger("rest.port", 10041);
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> >
> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> >
> >env.setParallelism(1);
> >// 建表
> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
> >" userid INT, " +
> >" username string, " +
> >" age string, " +
> >" `partition` INT, " +
> >" PRIMARY KEY(userid) NOT ENFORCED " +
> >" ) WITH ( " +
> >" 'connector' = 'mysql-cdc', " +
> >" 'server-id' = '5401-5404', " +
> >" 'scan.startup.mode' = 'latest-offset', " +
> >// " 'scan.startup.mode' = 'earliest-offset', " +
> >" 'hostname' = '192.168.0.220', " +
> >" 'port' = '3306', " +
> >" 'username' = 'root', " +
> >" 'password' = 'root', " +
> >" 'database-name' = 'zy', " +
> >" 'table-name' = 't_stu' " +
> >")");
> >
> >// 查询
> >tenv.executeSql("select * from flink_t_stu").print();
> >
> >
> >// 建一个目标表,用来存放查询结果
> >tenv.executeSql(
> >"CREATE TABLE flink_t_stu2 ( " +
> >" userid INT, " +
> >" username string, " +
> >" age string, " +
> >" `partition` INT, " +
> >" PRIMARY KEY(userid) NOT ENFORCED " +
> >" ) WITH ( " +
> >" 'connector' = 'jdbc', " +
> >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
> >" 'table-name' = 't_stu2', " +
> >" 'username' = 'root', " +
> >" 'password' = 'root' " +
> >")"
> >);
> >
> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
> >"SELECT * FROM flink_t_stu");
> >env.execute();
> >
> > }
>
Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
Posted by 左岩 <13...@163.com>.
.print(); 去掉也不行, 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
在 2022-11-04 16:52:08,"yinghua_zh@163.com" <yi...@163.com> 写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> StatementSet statementSet = tenv.createStatementSet();
> statementSet.addInsertSql(sql1);
> statementSet.addInsertSql(sql2);
> TableResult result = statementSet.execute();
> result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print(); --------------------这个任务给去掉
>
>
>
>yinghua_zh@163.com
>
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>" userid INT, " +
>" username string, " +
>" age string, " +
>" `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'mysql-cdc', " +
>" 'server-id' = '5401-5404', " +
>" 'scan.startup.mode' = 'latest-offset', " +
>// " 'scan.startup.mode' = 'earliest-offset', " +
>" 'hostname' = '192.168.0.220', " +
>" 'port' = '3306', " +
>" 'username' = 'root', " +
>" 'password' = 'root', " +
>" 'database-name' = 'zy', " +
>" 'table-name' = 't_stu' " +
>")");
>
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
>
>
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>" userid INT, " +
>" username string, " +
>" age string, " +
>" `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'jdbc', " +
>" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>" 'table-name' = 't_stu2', " +
>" 'username' = 'root', " +
>" 'password' = 'root' " +
>")"
>);
>
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
>
> }
回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
Posted by "yinghua_zh@163.com" <yi...@163.com>.
你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
StatementSet statementSet = tenv.createStatementSet();
statementSet.addInsertSql(sql1);
statementSet.addInsertSql(sql2);
TableResult result = statementSet.execute();
result.getJobClient().get().getJobID().toString();
或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
// 查询
tenv.executeSql("select * from flink_t_stu").print(); --------------------这个任务给去掉
yinghua_zh@163.com
发件人: 左岩
发送时间: 2022-11-04 14:34
收件人: user-zh
主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
代码如下:控制台打印情况见附件
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
env.setParallelism(1);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
" userid INT, " +
" username string, " +
" age string, " +
" `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'scan.startup.mode' = 'latest-offset', " +
// " 'scan.startup.mode' = 'earliest-offset', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");
// 查询
tenv.executeSql("select * from flink_t_stu").print();
// 建一个目标表,用来存放查询结果
tenv.executeSql(
"CREATE TABLE flink_t_stu2 ( " +
" userid INT, " +
" username string, " +
" age string, " +
" `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
" 'table-name' = 't_stu2', " +
" 'username' = 'root', " +
" 'password' = 'root' " +
")"
);
tenv.executeSql("INSERT INTO flink_t_stu2 " +
"SELECT * FROM flink_t_stu");
env.execute();
}
Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
Posted by Leonard Xu <xb...@gmail.com>.
> On Nov 4, 2022, at 2:34 PM, 左岩 <13...@163.com> wrote:
>
> tenv.executeSql("xxx);
> env.execute();
这样使用是不对的,你可以看下这两个方法的java doc
祝好,
Leonard