You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "yinghua_zh@163.com" <yi...@163.com> on 2022/11/03 03:53:12 UTC

回复: flinkcdc 读不到mysql中数据

要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。



yinghua_zh@163.com
 
发件人: 左岩
发送时间: 2022-11-03 11:34
收件人: user-zh
主题: flinkcdc 读不到mysql中数据
我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 10041);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

//        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        env.setParallelism(4);

        // 建表
        tenv.executeSql("CREATE TABLE flink_t_stu ( " +
                "      userid INT, " +
                "      username string, " +
                "      age string, " +
                "      `partition` INT, " +
                "     PRIMARY KEY(userid) NOT ENFORCED " +
                "     ) WITH ( " +
                "     'connector' = 'mysql-cdc', " +
                "     'server-id' = '5401-5404', " +
                "     'hostname' = '192.168.0.220', " +
                "     'port' = '3306', " +
                "     'username' = 'root', " +
                "     'password' = 'root', " +
                "     'database-name' = 'zy', " +
                "     'table-name' = 't_stu' " +
                ")");

        // 查询
        tenv.executeSql("select * from flink_t_stu").print();

        env.execute();

    }

Re: flinkcdc 读不到mysql中数据

Posted by yinghua_zh <yi...@163.com>.
要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。

> 在 2022年11月3日,11:54,yinghua_zh@163.com 写道:
> 
> 要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
> 
> 
> 
> yinghua_zh@163.com
> 
> 发件人: 左岩
> 发送时间: 2022-11-03 11:34
> 收件人: user-zh
> 主题: flinkcdc 读不到mysql中数据
> 我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
> public static void main(String[] args) throws Exception {
>        Configuration conf = new Configuration();
>        conf.setInteger("rest.port", 10041);
>        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
>        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>        env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
> //        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>        env.setParallelism(4);
> 
>        // 建表
>        tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>                "      userid INT, " +
>                "      username string, " +
>                "      age string, " +
>                "      `partition` INT, " +
>                "     PRIMARY KEY(userid) NOT ENFORCED " +
>                "     ) WITH ( " +
>                "     'connector' = 'mysql-cdc', " +
>                "     'server-id' = '5401-5404', " +
>                "     'hostname' = '192.168.0.220', " +
>                "     'port' = '3306', " +
>                "     'username' = 'root', " +
>                "     'password' = 'root', " +
>                "     'database-name' = 'zy', " +
>                "     'table-name' = 't_stu' " +
>                ")");
> 
>        // 查询
>        tenv.executeSql("select * from flink_t_stu").print();
> 
>        env.execute();
> 
>    }