You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "yinghua_zh@163.com" <yi...@163.com> on 2022/11/03 03:53:12 UTC
回复: flinkcdc 读不到mysql中数据
要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
yinghua_zh@163.com
发件人: 左岩
发送时间: 2022-11-03 11:34
收件人: user-zh
主题: flinkcdc 读不到mysql中数据
我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
// StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
env.setParallelism(4);
// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
" userid INT, " +
" username string, " +
" age string, " +
" `partition` INT, " +
" PRIMARY KEY(userid) NOT ENFORCED " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'server-id' = '5401-5404', " +
" 'hostname' = '192.168.0.220', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'database-name' = 'zy', " +
" 'table-name' = 't_stu' " +
")");
// 查询
tenv.executeSql("select * from flink_t_stu").print();
env.execute();
}
Re: flinkcdc 读不到mysql中数据
Posted by yinghua_zh <yi...@163.com>.
要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
> 在 2022年11月3日,11:54,yinghua_zh@163.com 写道:
>
> 要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
>
>
>
> yinghua_zh@163.com
>
> 发件人: 左岩
> 发送时间: 2022-11-03 11:34
> 收件人: user-zh
> 主题: flinkcdc 读不到mysql中数据
> 我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
> public static void main(String[] args) throws Exception {
> Configuration conf = new Configuration();
> conf.setInteger("rest.port", 10041);
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>
> env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>
> // StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>
> env.setParallelism(4);
>
> // 建表
> tenv.executeSql("CREATE TABLE flink_t_stu ( " +
> " userid INT, " +
> " username string, " +
> " age string, " +
> " `partition` INT, " +
> " PRIMARY KEY(userid) NOT ENFORCED " +
> " ) WITH ( " +
> " 'connector' = 'mysql-cdc', " +
> " 'server-id' = '5401-5404', " +
> " 'hostname' = '192.168.0.220', " +
> " 'port' = '3306', " +
> " 'username' = 'root', " +
> " 'password' = 'root', " +
> " 'database-name' = 'zy', " +
> " 'table-name' = 't_stu' " +
> ")");
>
> // 查询
> tenv.executeSql("select * from flink_t_stu").print();
>
> env.execute();
>
> }