You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/04/28 03:43:59 UTC

回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Thanks Leonard, 

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 吗? 
这个在源代码哪个地方呢?

谢谢,
王磊



wanglei2@geekplus.com.cn 

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)
 
> 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可 
 
Best,
 
Leonard Xu

回复: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Posted by 1101300123 <hd...@163.com>.
结果的正确性问题?我不太明白。比如我的结果是
(true,(Mary,1))
(true,(Bob,1))
(false,(Mary,1))
(true,(Mary,2))
(true,(Liz,1))
(false,(Bob,1))
(true,(Bob,2))
我只做upsert好像没什么问题。针对retract流处理没问题;
其实我还是不太明白 upsert 流


在2020年4月28日 14:34,Jark Wu<im...@gmail.com> 写道:
是能提高一定的效率。不过可能会导致结果正确性问题。

Best,
Jark

On Tue, 28 Apr 2020 at 14:16, 1101300123 <hd...@163.com> wrote:

我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率


在2020年4月28日 14:11,Jark Wu<im...@gmail.com> 写道:
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <hd...@163.com> wrote:

我看源码这样写道:
/**
* Get dialect upsert statement, the database has its own upsert syntax,
such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer
will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



wanglei2@geekplus.com.cn


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
RetractStreamSink)

我看

https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu



Re: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Posted by Jark Wu <im...@gmail.com>.
是能提高一定的效率。不过可能会导致结果正确性问题。

Best,
Jark

On Tue, 28 Apr 2020 at 14:16, 1101300123 <hd...@163.com> wrote:

> 我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率
>
>
> 在2020年4月28日 14:11,Jark Wu<im...@gmail.com> 写道:
> UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
> ,当 false 时代表 delelte,true 时代表 upsert 消息。
>
> Best,
> Jark
>
> On Tue, 28 Apr 2020 at 14:05, 1101300123 <hd...@163.com> wrote:
>
> 我看源码这样写道:
> /**
> * Get dialect upsert statement, the database has its own upsert syntax,
> such as Mysql
> * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
> UPDATE SET..
> *
> * @return None if dialect does not support upsert statement, the writer
> will degrade to
> * the use of select + update/insert, this performance is poor.
> */
> default Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> return Optional.empty();
> }
> 不同的数据库产品有不同的语句,所以默认实现是delete +insert
>
>
> 但是我看
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
>
>
> 方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
> 在2020年4月28日 11:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> 写道:
>
> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> wanglei2@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> 我看
>
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>
>

回复: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Posted by 1101300123 <hd...@163.com>.
我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率


在2020年4月28日 14:11,Jark Wu<im...@gmail.com> 写道:
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <hd...@163.com> wrote:

我看源码这样写道:
/**
* Get dialect upsert statement, the database has its own upsert syntax,
such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer
will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



wanglei2@geekplus.com.cn


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
RetractStreamSink)

我看
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu


Re: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Posted by Jark Wu <im...@gmail.com>.
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <hd...@163.com> wrote:

> 我看源码这样写道:
> /**
>  * Get dialect upsert statement, the database has its own upsert syntax,
> such as Mysql
>  * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
> UPDATE SET..
>  *
>  * @return None if dialect does not support upsert statement, the writer
> will degrade to
>  * the use of select + update/insert, this performance is poor.
>  */
> default Optional<String> getUpsertStatement(
>       String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> return Optional.empty();
> }
> 不同的数据库产品有不同的语句,所以默认实现是delete +insert
>
>
> 但是我看
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
>          Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
>          if (tuple.f0) {
>             processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
>       }
>       internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
>
>
> 方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
> 在2020年4月28日 11:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> 写道:
>
> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> wanglei2@geekplus.com.cn
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> 我看
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>

回复:回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Posted by 1101300123 <hd...@163.com>.
我看源码这样写道:
/**
 * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
 * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
 *
 * @return None if dialect does not support upsert statement, the writer will degrade to
 * the use of select + update/insert, this performance is poor.
 */
default Optional<String> getUpsertStatement(
      String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
         Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
         if (tuple.f0) {
            processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
      }
      internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,wanglei2@geekplus.com.cn<wa...@geekplus.com.cn> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



wanglei2@geekplus.com.cn


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)

我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu