You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> on 2020/04/30 09:08:22 UTC
FlinkSQL Retraction 问题原理咨询
自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from
(select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
group by tms_company;
总共发送了 4 条消息,顺序如下:
1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了)
3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除)
4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
问题一:
第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
问题二:
第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
谢谢,
王磊
wanglei2@geekplus.com.cn
Re:回复: Re:FlinkSQL Retraction 问题原理咨询
Posted by Michael Ran <gr...@163.com>.
1.flink 状态或者内存维护了所有结果。
2.当group by count 结果值(tms_company=1),新来一条记录变成(tms_company=2)
tms_company=1 (旧,false)
tms_company=2 (新,true)
3. 内存里面就会把旧的舍弃掉,用新的参与后续计算
4.如果存储(mysql 之类的),会生成对应的SQL 进行更新掉
在 2020-05-06 10:36:35,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> 写道:
>
>更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。
>我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
>wanglei2@geekplus.com.cn
>
>发件人: Michael Ran
>发送时间: 2020-04-30 17:23
>收件人: user-zh
>主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
>指定的更新键是tms_company?
>
>
>结果是:
>yuantong:2
>zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> 写道:
>>
>>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>>
>>INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from
>> (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
>> group by tms_company;
>>
>>
>>总共发送了 4 条消息,顺序如下:
>>
>>1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
>>
>>2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了)
>>
>>3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除)
>>
>>4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
>>
>>
>>问题一:
>> 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>>
>>问题二:
>> 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
>> 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>>
>>谢谢,
>>王磊
>>
>>
>>
>>wanglei2@geekplus.com.cn
Re: Re:FlinkSQL Retraction 问题原理咨询
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
> sink 表中没有任何主键或唯一键
这个时候更合理的方式应该是抛出异常,不过实现上可能有些不好搞
> 回撤导致的结果变成 0 ,就会执行 delete , 否则就是update
你理解的完全正确
Best
Jingsong Lee
On Wed, May 6, 2020 at 12:39 PM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> Thanks Jingsong Lee.
>
> 我用的是 MySQL,sink 表中没有任何主键或唯一键.
> 如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。
>
> 我把 flink sql-client 客户端设置 SET execution.result-mode=changelog
> 试验了下,左边标上了是第几条 kafka 消息导致的行为:
>
> +/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1
> 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1
>
> 第1条消息:执行一个 INSERT
> 第2条消息:执行了 一个 DELETE, 一个 INSERT
> 第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE
> 第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE
>
>
> 我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是 INSERT ON DUPLICATE UPDATE
>
> 不知道我这样理解是否正确。
>
> 谢谢,
> 王磊
>
>
>
>
> wanglei2@geekplus.com.cn
>
> Sender: Jingsong Li
> Send Time: 2020-05-06 11:35
> Receiver: user-zh
> Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
> Hi,
>
> 问题一:删除数据可不单单只是retract stream的功能。upsert
> stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
> stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
>
> 问题二:正确数据应该是:
> 1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
> 2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除
> zhongtong 1)
> 3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 (
> 删除yuantong 1)
> 4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1,
> zhongtong 1 ( 删除yuantong 2)
>
> 你用了什么dialect?是不是mysql?
> Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
> 看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 10:36 AM wanglei2@geekplus.com.cn <
> wanglei2@geekplus.com.cn> wrote:
>
> >
> > 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> > tms_company 是有变化的。
> > 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
> >
> >
> >
> >
> > wanglei2@geekplus.com.cn
> >
> > 发件人: Michael Ran
> > 发送时间: 2020-04-30 17:23
> > 收件人: user-zh
> > 主题: Re:FlinkSQL Retraction 问题原理咨询
> >
> >
> >
> > 指定的更新键是tms_company?
> >
> >
> > 结果是:
> > yuantong:2
> > zhongtong:2
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <
> wanglei2@geekplus.com.cn>
> > 写道:
> > >
> > >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> > RDS, RDS 表没有主键,也没有唯一键。
> > >
> > >INSERT INTO table_out select tms_company, count(distinct order_id) as
> > order_cnt from
> > > (select order_id, LAST_VALUE(tms_company) AS tms_company from
> > dwd_table group by order_id)
> > > group by tms_company;
> > >
> > >
> > >总共发送了 4 条消息,顺序如下:
> > >
> > >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
> > >
> > >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1
> > (上一条记录被删除了)
> > >
> > >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> > yuantong 2 (增加了条记录,没有删除)
> > >
> > >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1,
> > yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
> > >
> > >
> > >问题一:
> > > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> >
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> > 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> > >
> > >问题二:
> > > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> > > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> > >
> > >谢谢,
> > >王磊
> > >
> > >
> > >
> > >wanglei2@geekplus.com.cn
> >
>
>
> --
> Best, Jingsong Lee
>
--
Best, Jingsong Lee
Re: Re:FlinkSQL Retraction 问题原理咨询
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
Thanks Jingsong Lee.
我用的是 MySQL,sink 表中没有任何主键或唯一键.
如果 sink 表设主键或唯一键,确实能达到只保留两条记录的效果。
我把 flink sql-client 客户端设置 SET execution.result-mode=changelog 试验了下,左边标上了是第几条 kafka 消息导致的行为:
+/- tms_company order_cnt 1 + zhongtong 1 2 - zhongtong 1 2 + yuantong 1 3 - yuantong 1 3 + yuantong 2 4 - yuantong 2 4 + yuantong 1 4 + zhongtong 1
第1条消息:执行一个 INSERT
第2条消息:执行了 一个 DELETE, 一个 INSERT
第3条消息:执行了一个 INSERT ON DUPLICATE UPDATE
第4条消息:执行了两个 INSERT ON DUPLICATE UPDATE
我总结这个逻辑应该是如果 回撤导致的结果变成 0 ,就会执行 delete , 否则就是 INSERT ON DUPLICATE UPDATE
不知道我这样理解是否正确。
谢谢,
王磊
wanglei2@geekplus.com.cn
Sender: Jingsong Li
Send Time: 2020-05-06 11:35
Receiver: user-zh
Subject: Re: Re:FlinkSQL Retraction 问题原理咨询
Hi,
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
问题二:正确数据应该是:
1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除
zhongtong 1)
3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 (
删除yuantong 1)
4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1,
zhongtong 1 ( 删除yuantong 2)
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
Best,
Jingsong Lee
On Wed, May 6, 2020 at 10:36 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wanglei2@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select tms_company, count(distinct order_id) as
> order_cnt from
> > (select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
> >
> >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2 (增加了条记录,没有删除)
> >
> >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
> >
> >
> >问题一:
> > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wanglei2@geekplus.com.cn
>
--
Best, Jingsong Lee
Re: Re:FlinkSQL Retraction 问题原理咨询
Posted by Jingsong Li <ji...@gmail.com>.
Hi,
问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。
问题二:正确数据应该是:
1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 ( 删除
zhongtong 1)
3 {"order_id":2,"tms_company":"yuantong"} 数据库1条记录: yuantong 2 (
删除yuantong 1)
4 {"order_id":2,"tms_company":"zhongtong"} 数据库2条记录: yuantong 1,
zhongtong 1 ( 删除yuantong 2)
你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?
Best,
Jingsong Lee
On Wed, May 6, 2020 at 10:36 AM wanglei2@geekplus.com.cn <
wanglei2@geekplus.com.cn> wrote:
>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wanglei2@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select tms_company, count(distinct order_id) as
> order_cnt from
> > (select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
> >
> >2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1,
> yuantong 2 (增加了条记录,没有删除)
> >
> >4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
> >
> >
> >问题一:
> > 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> > 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> > 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wanglei2@geekplus.com.cn
>
--
Best, Jingsong Lee
回复: Re:FlinkSQL Retraction 问题原理咨询
Posted by "wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn>.
更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的 tms_company 是有变化的。
我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
wanglei2@geekplus.com.cn
发件人: Michael Ran
发送时间: 2020-04-30 17:23
收件人: user-zh
主题: Re:FlinkSQL Retraction 问题原理咨询
指定的更新键是tms_company?
结果是:
yuantong:2
zhongtong:2
在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> 写道:
>
>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>
>INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from
> (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
> group by tms_company;
>
>
>总共发送了 4 条消息,顺序如下:
>
>1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
>
>2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了)
>
>3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除)
>
>4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
>
>
>问题一:
> 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>
>问题二:
> 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>
>谢谢,
>王磊
>
>
>
>wanglei2@geekplus.com.cn
Re:FlinkSQL Retraction 问题原理咨询
Posted by Michael Ran <gr...@163.com>.
指定的更新键是tms_company?
结果是:
yuantong:2
zhongtong:2
在 2020-04-30 17:08:22,"wanglei2@geekplus.com.cn" <wa...@geekplus.com.cn> 写道:
>
>自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到 RDS, RDS 表没有主键,也没有唯一键。
>
>INSERT INTO table_out select tms_company, count(distinct order_id) as order_cnt from
> (select order_id, LAST_VALUE(tms_company) AS tms_company from dwd_table group by order_id)
> group by tms_company;
>
>
>总共发送了 4 条消息,顺序如下:
>
>1 {"order_id":1,"tms_company":"zhongtong"} 数据库1条记录: zhongtong 1
>
>2 {"order_id":1,"tms_company":"yuantong"} 数据库1条记录: yuantong 1 (上一条记录被删除了)
>
>3 {"order_id":2,"tms_company":"yuantong"} 数据库2条记录: yuantong 1, yuantong 2 (增加了条记录,没有删除)
>
>4 {"order_id":2,"tms_company":"zhongtong"} 数据库4条记录: yuantong 1, yuantong 2, yuantong 1, zhongtong 1 (增加了两条记录,没有删除)
>
>
>问题一:
> 第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
>
>问题二:
> 第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> 第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
>
>谢谢,
>王磊
>
>
>
>wanglei2@geekplus.com.cn