You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by chenxuying <cx...@163.com> on 2020/07/31 08:12:40 UTC
flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
" a bigint, " +
" b bigint " +
" ) WITH ( " +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'mytesttopic', " +
" 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
" 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
" 'connector.properties.group.id' = 'flink-test-cxy', " +
" 'connector.startup-mode' = 'latest-offset', " +
" 'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
" game_id varchar, " +
" PRIMARY KEY (id) NOT ENFORCED " +
" ) " +
" with ( " +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
" 'connector.username' = 'root' , " +
" 'connector.password' = 'root', " +
" 'connector.table' = 'mysqlsink' , " +
" 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'connector.write.flush.interval' = '2s', " +
" 'connector.write.flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)");
问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
Posted by chenxuying <cx...@163.com>.
hi
ok,谢谢,懂了哈哈
在 2020-07-31 21:27:02,"Leonard Xu" <xb...@gmail.com> 写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying <cx...@163.com> 写道:
>>
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
>是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。
>
>在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY KEY,不会再有类似问题.1.11的文档参考[2]。
>
>祝好
>Leonard
>
>[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector>
>[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>
Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
Posted by Leonard Xu <xb...@gmail.com>.
Hello
> 在 2020年7月31日,21:13,chenxuying <cx...@163.com> 写道:
>
> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
是通过用户的query来决定写入的模式是upsert 还是 append , 你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 如果已经在用1.11了,1.10的文档可以不用看的。
在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY KEY,不会再有类似问题.1.11的文档参考[2]。
祝好
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector>
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>
Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
Posted by chenxuying <cx...@163.com>.
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
在 2020-07-31 16:46:41,"Leonard Xu" <xb...@gmail.com> 写道:
>Hi, chenxuying
>
>看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
>
>Best
>Leonard
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options>
>
>> 在 2020年7月31日,16:12,chenxuying <cx...@163.com> 写道:
>>
>> hi
>> 我使用的flink 1.11.0版本
>> 代码如下
>> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
>> tableEnvironment.executeSql(" " +
>> " CREATE TABLE mySource ( " +
>> " a bigint, " +
>> " b bigint " +
>> " ) WITH ( " +
>> " 'connector.type' = 'kafka', " +
>> " 'connector.version' = 'universal', " +
>> " 'connector.topic' = 'mytesttopic', " +
>> " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
>> " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
>> " 'connector.properties.group.id' = 'flink-test-cxy', " +
>> " 'connector.startup-mode' = 'latest-offset', " +
>> " 'format.type' = 'json' " +
>> " ) ");
>> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
>> " id bigint, " +
>> " game_id varchar, " +
>> " PRIMARY KEY (id) NOT ENFORCED " +
>> " ) " +
>> " with ( " +
>> " 'connector.type' = 'jdbc', " +
>> " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> " 'connector.username' = 'root' , " +
>> " 'connector.password' = 'root', " +
>> " 'connector.table' = 'mysqlsink' , " +
>> " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> " 'connector.write.flush.interval' = '2s', " +
>> " 'connector.write.flush.max-rows' = '300' " +
>> " )");
>> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)");
>>
>>
>> 问题一 : 上面的insert语句会出现如下错误
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
>>
>>
>> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
>> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
>>
>>
>>
>
Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
Posted by Leonard Xu <xb...@gmail.com>.
Hi, chenxuying
看你还是用的还是 " 'connector.type' = 'jdbc', …. " ,这是老的option,使用老的option参数还是需要根据query推导主键,
需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
Best
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options>
> 在 2020年7月31日,16:12,chenxuying <cx...@163.com> 写道:
>
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> " a bigint, " +
> " b bigint " +
> " ) WITH ( " +
> " 'connector.type' = 'kafka', " +
> " 'connector.version' = 'universal', " +
> " 'connector.topic' = 'mytesttopic', " +
> " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> " 'connector.properties.group.id' = 'flink-test-cxy', " +
> " 'connector.startup-mode' = 'latest-offset', " +
> " 'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> " game_id varchar, " +
> " PRIMARY KEY (id) NOT ENFORCED " +
> " ) " +
> " with ( " +
> " 'connector.type' = 'jdbc', " +
> " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'connector.username' = 'root' , " +
> " 'connector.password' = 'root', " +
> " 'connector.table' = 'mysqlsink' , " +
> " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'connector.write.flush.interval' = '2s', " +
> " 'connector.write.flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)");
>
>
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
>
>
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
>
>
>
Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
Posted by 李奇 <35...@qq.com>.
改成update模式,然后也可以修改唯一主键为自然键
> 在 2020年7月31日,下午4:13,chenxuying <cx...@163.com> 写道:
>
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> " a bigint, " +
> " b bigint " +
> " ) WITH ( " +
> " 'connector.type' = 'kafka', " +
> " 'connector.version' = 'universal', " +
> " 'connector.topic' = 'mytesttopic', " +
> " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> " 'connector.properties.group.id' = 'flink-test-cxy', " +
> " 'connector.startup-mode' = 'latest-offset', " +
> " 'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> " game_id varchar, " +
> " PRIMARY KEY (id) NOT ENFORCED " +
> " ) " +
> " with ( " +
> " 'connector.type' = 'jdbc', " +
> " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> " 'connector.username' = 'root' , " +
> " 'connector.password' = 'root', " +
> " 'connector.table' = 'mysqlsink' , " +
> " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> " 'connector.write.flush.interval' = '2s', " +
> " 'connector.write.flush.max-rows' = '300' " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values (select a,cast(b as varchar) b from mySource)");
>
>
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
>
>
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'
>
>
>