You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Jark Wu <im...@gmail.com> on 2020/04/28 03:15:45 UTC

Re: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java#L261

Best,
Jark

On Tue, 28 Apr 2020 at 10:24, wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:

> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> wanglei2@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> > INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> > 我看
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> > 如若不带 group by 直接:
> > INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>