You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 左岩 <13...@163.com> on 2022/11/03 03:34:24 UTC

flinkcdc 读不到mysql中数据

我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger("rest.port", 10041);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

//        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

env.setParallelism(4);

// 建表
tenv.executeSql("CREATE TABLE flink_t_stu ( " +
"      userid INT, " +
"      username string, " +
"      age string, " +
"      `partition` INT, " +
"     PRIMARY KEY(userid) NOT ENFORCED " +
"     ) WITH ( " +
"     'connector' = 'mysql-cdc', " +
"     'server-id' = '5401-5404', " +
"     'hostname' = '192.168.0.220', " +
"     'port' = '3306', " +
"     'username' = 'root', " +
"     'password' = 'root', " +
"     'database-name' = 'zy', " +
"     'table-name' = 't_stu' " +
")");

// 查询
tenv.executeSql("select * from flink_t_stu").print();

env.execute();

    }

Re: flinkcdc 读不到mysql中数据

Posted by Leonard Xu <xb...@gmail.com>.
Flink CDC 社区有提供1.14支持的,2.2.1版本即可。你这个好像是没有开启checkpoint, 开启下就好了。
// enable checkpoint
env.enableCheckpointing(1000);


祝好,
Leonard

> 2022年11月3日 上午11:34,左岩 <13...@163.com> 写道:
> 
> 我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
> public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setInteger("rest.port", 10041);
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
> //        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>         env.setParallelism(4);
> 
>         // 建表
>         tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>                 "      userid INT, " +
>                 "      username string, " +
>                 "      age string, " +
>                 "      `partition` INT, " +
>                 "     PRIMARY KEY(userid) NOT ENFORCED " +
>                 "     ) WITH ( " +
>                 "     'connector' = 'mysql-cdc', " +
>                 "     'server-id' = '5401-5404', " +
>                 "     'hostname' = '192.168.0.220', " +
>                 "     'port' = '3306', " +
>                 "     'username' = 'root', " +
>                 "     'password' = 'root', " +
>                 "     'database-name' = 'zy', " +
>                 "     'table-name' = 't_stu' " +
>                 ")");
> 
>         // 查询
>         tenv.executeSql("select * from flink_t_stu").print();
> 
>         env.execute();
> 
>     }
> <idealog2.txt>


Re: flinkcdc 读不到mysql中数据

Posted by yinghua_zh <yi...@163.com>.
要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。

> 在 2022年11月3日,11:54,yinghua_zh@163.com 写道:
> 
> 要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。
> 
> 
> 
> yinghua_zh@163.com
> 
> 发件人: 左岩
> 发送时间: 2022-11-03 11:34
> 收件人: user-zh
> 主题: flinkcdc 读不到mysql中数据
> 我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
> public static void main(String[] args) throws Exception {
>        Configuration conf = new Configuration();
>        conf.setInteger("rest.port", 10041);
>        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
>        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>        env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
> //        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>        env.setParallelism(4);
> 
>        // 建表
>        tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>                "      userid INT, " +
>                "      username string, " +
>                "      age string, " +
>                "      `partition` INT, " +
>                "     PRIMARY KEY(userid) NOT ENFORCED " +
>                "     ) WITH ( " +
>                "     'connector' = 'mysql-cdc', " +
>                "     'server-id' = '5401-5404', " +
>                "     'hostname' = '192.168.0.220', " +
>                "     'port' = '3306', " +
>                "     'username' = 'root', " +
>                "     'password' = 'root', " +
>                "     'database-name' = 'zy', " +
>                "     'table-name' = 't_stu' " +
>                ")");
> 
>        // 查询
>        tenv.executeSql("select * from flink_t_stu").print();
> 
>        env.execute();
> 
>    }


回复: flinkcdc 读不到mysql中数据

Posted by "yinghua_zh@163.com" <yi...@163.com>.
要在mysql表上t_stu上建个主键,只在FlinkSQL上指定主键不行。



yinghua_zh@163.com
 
发件人: 左岩
发送时间: 2022-11-03 11:34
收件人: user-zh
主题: flinkcdc 读不到mysql中数据
我用的是flink1.14 ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 10041);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");

//        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        env.setParallelism(4);

        // 建表
        tenv.executeSql("CREATE TABLE flink_t_stu ( " +
                "      userid INT, " +
                "      username string, " +
                "      age string, " +
                "      `partition` INT, " +
                "     PRIMARY KEY(userid) NOT ENFORCED " +
                "     ) WITH ( " +
                "     'connector' = 'mysql-cdc', " +
                "     'server-id' = '5401-5404', " +
                "     'hostname' = '192.168.0.220', " +
                "     'port' = '3306', " +
                "     'username' = 'root', " +
                "     'password' = 'root', " +
                "     'database-name' = 'zy', " +
                "     'table-name' = 't_stu' " +
                ")");

        // 查询
        tenv.executeSql("select * from flink_t_stu").print();

        env.execute();

    }