You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by WeiXubin <18...@163.com> on 2021/06/09 08:50:55 UTC

FlinkSQL join 维表后一定会变成 upsert流吗?

请教各位一下,我使用 FlinkSQL 编写任务时,kafka source ->  MySQL sink  不设置主键,查看了一下 request
mode 是 [INSERT] ,也就是普通的 append 流,这很正常。

但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。

upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在
ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 

请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流?

// 维表
CREATE TABLE DimTable (
   //省略字段
) WITH (
    'connector' = 'jdbc',
    'url' = '***********',
    'table-name' = 'v2_dim_game_id',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '*******',
    'password' = '*******',
    'lookup.cache.max-rows'='5000',
    'lookup.cache.ttl' = '60s',
    'lookup.max-retries'='3'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?

Posted by LakeShen <sh...@gmail.com>.
维保 Join 理论上不会改变流的模式,我理解原来你的流是什么,就是什么。

Best,
LakeShen

WeiXubin <18...@163.com> 于2021年6月10日周四 下午5:46写道:

> 感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal
> Joins
> 三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为
> INSERT INTO ON DUPLICATE KEY UPDATE  的执行语句, 并不是我所期望的纯 append 模式
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?

Posted by WeiXubin <18...@163.com>.
感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal Joins
三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为
INSERT INTO ON DUPLICATE KEY UPDATE  的执行语句, 并不是我所期望的纯 append 模式



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:FlinkSQL join 维表后一定会变成 upsert流吗?

Posted by 东东 <do...@163.com>.
你用的是regular join吧,那就必然会变成retract流啊,因为一旦右流有变化,就会将变更影响到的结果输出,不就会有retract么。



join的类型可以看一下文档


在 2021-06-09 16:50:55,"WeiXubin" <18...@163.com> 写道:
>请教各位一下,我使用 FlinkSQL 编写任务时,kafka source ->  MySQL sink  不设置主键,查看了一下 request
>mode 是 [INSERT] ,也就是普通的 append 流,这很正常。
>
>但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
>DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。
>
>upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在
>ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 
>
>请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流?
>
>// 维表
>CREATE TABLE DimTable (
>   //省略字段
>) WITH (
>    'connector' = 'jdbc',
>    'url' = '***********',
>    'table-name' = 'v2_dim_game_id',
>    'driver' = 'com.mysql.cj.jdbc.Driver',
>    'username' = '*******',
>    'password' = '*******',
>    'lookup.cache.max-rows'='5000',
>    'lookup.cache.ttl' = '60s',
>    'lookup.max-retries'='3'
>);
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL join 维表后一定会变成 upsert流吗?

Posted by todd <to...@163.com>.
和你join使用的 left table RowKind 保持一致。



--
Sent from: http://apache-flink.147419.n8.nabble.com/