You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李航飞 <te...@163.com> on 2021/08/19 09:02:59 UTC

flink-connector-redis连接器upsert()模式插入问题

版本 flink1.13.2
具体场景
flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次


问题:
测试发现,每1分钟都会输出一次,落地的数据一样,
根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?


Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

Posted by 李航飞 <te...@163.com>.
你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
       return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng" <ts...@gmail.com> 写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞 <te...@163.com> 于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞" <te...@163.com> 写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng" <ts...@gmail.com> 写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞 <te...@163.com> 于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>

Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
确实应该每分钟收到一条消息。

sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。

李航飞 <te...@163.com> 于2021年8月20日周五 上午10:03写道:

> 你好:
> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>
>
>
> 在 2021-08-20 09:10:44,"李航飞" <te...@163.com> 写道:
> >你好:
> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
> >
> >
> >我在RichMapFunction接口里面实现open方法
> >设置过StateTtlConfig;
> >之后在RedisConmmand.SETEX设置过期时间
> >都注释了,但upsert()方法还是没效
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-08-19 17:44:02,"Caizhi Weng" <ts...@gmail.com> 写道:
> >>Hi!
> >>
> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
> >>
> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
> >>
> >>李航飞 <te...@163.com> 于2021年8月19日周四 下午5:03写道:
> >>
> >>> 版本 flink1.13.2
> >>> 具体场景
> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
> >>>
> >>>
> >>> 问题:
> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
> >>>
> >>>
>

Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

Posted by 李航飞 <te...@163.com>.
你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞" <te...@163.com> 写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng" <ts...@gmail.com> 写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞 <te...@163.com> 于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>

Re:Re: flink-connector-redis连接器upsert()模式插入问题

Posted by 李航飞 <te...@163.com>.
你好:
我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。


我在RichMapFunction接口里面实现open方法
设置过StateTtlConfig;
之后在RedisConmmand.SETEX设置过期时间
都注释了,但upsert()方法还是没效














在 2021-08-19 17:44:02,"Caizhi Weng" <ts...@gmail.com> 写道:
>Hi!
>
>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>
>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>
>李航飞 <te...@163.com> 于2021年8月19日周四 下午5:03写道:
>
>> 版本 flink1.13.2
>> 具体场景
>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>
>>
>> 问题:
>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>
>>

Re: flink-connector-redis连接器upsert()模式插入问题

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。

如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。

李航飞 <te...@163.com> 于2021年8月19日周四 下午5:03写道:

> 版本 flink1.13.2
> 具体场景
> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>
>
> 问题:
> 测试发现,每1分钟都会输出一次,落地的数据一样,
> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>
>