You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 曹武 <14...@163.com> on 2020/07/23 08:14:12 UTC
flink 1.11 ddl 写mysql的问题
我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
代码如下:
String sourceDdl =" CREATE TABLE debezium_source " +
"( " +
"id STRING NOT NULL, name STRING, description STRING, weight
Double" +
") " +
"WITH (" +
" 'connector' = 'kafka-0.11'," +
" 'topic' = 'test0717'," +
" 'properties.bootstrap.servers' = ' 172.22.20.206:9092', "
+
"'scan.startup.mode' =
'group-offsets','properties.group.id'='test'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include'='false'," +
"'debezium-json.ignore-parse-errors'='true')";
tEnv.executeSql(sourceDdl);
System.out.println("init source ddl successful ==>" + sourceDdl);
String sinkDdl = " CREATE TABLE sink " +
"( " +
"id STRING NOT NULL," +
" name STRING, " +
"description STRING," +
" weight Double," +
" PRIMARY KEY (id) NOT ENFORCED " +
")" +
" WITH " +
"( " +
"'connector' = 'jdbc', " +
"'url' =
'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
"'table-name' = 'table-out', " +
"'driver'= 'com.mysql.cj.jdbc.Driver'," +
"'sink.buffer-flush.interval'='1s'," +
"'sink.buffer-flush.max-rows'='1000'," +
"'username'='DataPip', " +
"'password'='DataPip')";
tEnv.executeSql(sinkDdl);
System.out.println("init sink ddl successful ==>" + sinkDdl);
String dml = "INSERT INTO sink SELECT id,name ,description,
weight FROM debezium_source";
System.out.println("execute dml ==>" + dml);
tEnv.executeSql(dml);
tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
'print')" +
"LIKE debezium_source (EXCLUDING ALL)");
tEnv.executeSql("INSERT INTO print_table SELECT id,name
,description, weight FROM debezium_source");
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 ddl 写mysql的问题
Posted by Jark Wu <im...@gmail.com>.
kafka 数据源生产数据的速率是多少呢? 会不会数据源就是每秒100条数据呢。。。?
Btw, 查看反压状态是一个比较好的排查方式。
On Thu, 23 Jul 2020 at 20:25, godfrey he <go...@gmail.com> wrote:
> 你观察到有sink写不过来导致反压吗?
> 或者你调大flush interval试试,让每个buffer攒更多的数据
>
> 曹武 <14...@163.com> 于2020年7月23日周四 下午4:48写道:
>
> > 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> > 代码如下:
> > String sourceDdl =" CREATE TABLE debezium_source " +
> > "( " +
> > "id STRING NOT NULL, name STRING, description STRING,
> > weight
> > Double" +
> > ") " +
> > "WITH (" +
> > " 'connector' = 'kafka-0.11'," +
> > " 'topic' = 'test0717'," +
> > " 'properties.bootstrap.servers' = ' 172.22.20.206:9092
> ',
> > "
> > +
> > "'scan.startup.mode' =
> > 'group-offsets','properties.group.id'='test'," +
> > "'format' = 'debezium-json'," +
> > "'debezium-json.schema-include'='false'," +
> > "'debezium-json.ignore-parse-errors'='true')";
> > tEnv.executeSql(sourceDdl);
> > System.out.println("init source ddl successful ==>" + sourceDdl);
> > String sinkDdl = " CREATE TABLE sink " +
> > "( " +
> > "id STRING NOT NULL," +
> > " name STRING, " +
> > "description STRING," +
> > " weight Double," +
> > " PRIMARY KEY (id) NOT ENFORCED " +
> > ")" +
> > " WITH " +
> > "( " +
> > "'connector' = 'jdbc', " +
> > "'url' =
> > 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> > "'table-name' = 'table-out', " +
> > "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> > "'sink.buffer-flush.interval'='1s'," +
> > "'sink.buffer-flush.max-rows'='1000'," +
> > "'username'='DataPip', " +
> > "'password'='DataPip')";
> > tEnv.executeSql(sinkDdl);
> > System.out.println("init sink ddl successful ==>" + sinkDdl);
> >
> > String dml = "INSERT INTO sink SELECT id,name ,description,
> > weight FROM debezium_source";
> > System.out.println("execute dml ==>" + dml);
> > tEnv.executeSql(dml);
> > tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> > 'print')" +
> > "LIKE debezium_source (EXCLUDING ALL)");
> > tEnv.executeSql("INSERT INTO print_table SELECT id,name
> > ,description, weight FROM debezium_source");
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
Re: flink 1.11 ddl 写mysql的问题
Posted by godfrey he <go...@gmail.com>.
你观察到有sink写不过来导致反压吗?
或者你调大flush interval试试,让每个buffer攒更多的数据
曹武 <14...@163.com> 于2020年7月23日周四 下午4:48写道:
> 我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛
> 代码如下:
> String sourceDdl =" CREATE TABLE debezium_source " +
> "( " +
> "id STRING NOT NULL, name STRING, description STRING,
> weight
> Double" +
> ") " +
> "WITH (" +
> " 'connector' = 'kafka-0.11'," +
> " 'topic' = 'test0717'," +
> " 'properties.bootstrap.servers' = ' 172.22.20.206:9092',
> "
> +
> "'scan.startup.mode' =
> 'group-offsets','properties.group.id'='test'," +
> "'format' = 'debezium-json'," +
> "'debezium-json.schema-include'='false'," +
> "'debezium-json.ignore-parse-errors'='true')";
> tEnv.executeSql(sourceDdl);
> System.out.println("init source ddl successful ==>" + sourceDdl);
> String sinkDdl = " CREATE TABLE sink " +
> "( " +
> "id STRING NOT NULL," +
> " name STRING, " +
> "description STRING," +
> " weight Double," +
> " PRIMARY KEY (id) NOT ENFORCED " +
> ")" +
> " WITH " +
> "( " +
> "'connector' = 'jdbc', " +
> "'url' =
> 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " +
> "'table-name' = 'table-out', " +
> "'driver'= 'com.mysql.cj.jdbc.Driver'," +
> "'sink.buffer-flush.interval'='1s'," +
> "'sink.buffer-flush.max-rows'='1000'," +
> "'username'='DataPip', " +
> "'password'='DataPip')";
> tEnv.executeSql(sinkDdl);
> System.out.println("init sink ddl successful ==>" + sinkDdl);
>
> String dml = "INSERT INTO sink SELECT id,name ,description,
> weight FROM debezium_source";
> System.out.println("execute dml ==>" + dml);
> tEnv.executeSql(dml);
> tEnv.executeSql("CREATE TABLE print_table WITH ('connector' =
> 'print')" +
> "LIKE debezium_source (EXCLUDING ALL)");
> tEnv.executeSql("INSERT INTO print_table SELECT id,name
> ,description, weight FROM debezium_source");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>