You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 1101300123 <hd...@163.com> on 2020/04/28 06:59:17 UTC

回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

如果我的sink是mysql 支持主键索引,我可不可以理解处理逻辑是retract 和upsert是一样的;上游数据false标记的是失效的记录,我删除失效的或者更新失效的数据是无区别的;


其实我还是对retract流和upsert流有点疑问
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/streaming/dynamic_tables.html



Append-only stream: A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.

Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message, a DELETE change as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.





Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a stream by encoding INSERT and UPDATE changes as upsert messages and DELETE changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.





The API to convert a dynamic table into a DataStream is discussed on the Common Concepts page. Please note that only append and retract streams are supported when converting a dynamic table into a DataStream. The TableSink interface to emit a dynamic table to an external system are discussed on the TableSources and TableSinks page.

retract流做分组聚合,满足条件的group 字段数据来了,需要更新,发出一条false 标识的旧数据告诉下游这条数据失效了;

但是Upsert 是什么意思;An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. 有点不明白



在2020年4月27日 12:58,Leonard Xu<xb...@gmail.com> 写道:
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)

我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu