You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Lei Wang <le...@gmail.com> on 2022/03/01 03:35:07 UTC

Re: 实时数据入库怎样过滤中间状态,保证最终一致

谢谢,这种是可以。

取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:

env.addSource(consumer).keyBy(new KeySelector<String, String>() {
    @Override
    public String getKey(String value) throws Exception {
        return value;
    }
}).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()

实际上逆序输出了窗口内的所有记录。

谢谢,

王磊



On Mon, Feb 28, 2022 at 9:59 AM 18703416172@163.com <18...@163.com>
wrote:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang <le...@gmail.com> 写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>

Re: 实时数据入库怎样过滤中间状态,保证最终一致

Posted by Guo Thompson <gw...@gmail.com>.
可以参考jdbc-connector写mysql的思路,在java里面用hashMap来存,key为 order_id
 ,然后定时把map的数据刷mysql

18703416172@163.com <18...@163.com> 于2022年3月1日周二 14:40写道:

> 首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource
> 示例代码如下:
> static class MySource {
>     Long ts;
>     String key;
>     Object object;
> }
> env.addSource(new SourceFunction<MySource>() {
>     @Override
>     public void run(SourceContext<MySource> ctx) throws Exception {
>         ctx.collect(new MySource());
>     }
>     @Override
>     public void cancel() {
>     }
> }).keyBy(new KeySelector<MySource, String>() {
>     @Override
>     public String getKey(MySource value) throws Exception {
>         return value.key;
>     }
> }).timeWindow(Time.seconds(10)).process(new
> ProcessWindowFunction<MySource, Object, String, TimeWindow>() {
>     @Override
>     public void process(String s, Context context, Iterable<MySource>
> elements, Collector<Object> out) throws Exception {
>         List<MySource> collect =
> Lists.newArrayList(elements).stream().sorted(new Comparator<MySource>() {
>             @Override
>             public int compare(MySource o1, MySource o2) {
>                 return o2.ts.compareTo(o1.ts);
>             }
>         }).collect(Collectors.toList());
>         if (collect.size() > 0){
>             out.collect(collect.get(0).object);
>         }
>     }
> }).addSink(new SinkFunction<Object>() {
>     @Override
>     public void invoke(Object value, Context context) throws Exception {
>         System.out.println(value);
>     }
> });
>
>
>
>
>
> > 2022年3月1日 上午11:35,Lei Wang <le...@gmail.com> 写道:
> >
> > 谢谢,这种是可以。
> >
> > 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:
> >
> > env.addSource(consumer).keyBy(new KeySelector<String, String>() {
> >    @Override
> >    public String getKey(String value) throws Exception {
> >        return value;
> >    }
> > }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()
> >
> > 实际上逆序输出了窗口内的所有记录。
> >
> > 谢谢,
> >
> > 王磊
> >
> >
> >
> > On Mon, Feb 28, 2022 at 9:59 AM 18703416172@163.com <18703416172@163.com
> >
> > wrote:
> >
> >> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
> >>
> >>> 2022年2月25日 下午6:45,Lei Wang <le...@gmail.com> 写道:
> >>>
> >>> 场景描述:
> >>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> >>> order_id   status
> >>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >>>
> >>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> >>> 最终的状态不丢,但这个最终的状态也不确定是多少。
> >>>
> >>> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> >>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >>>
> >>> 请问有什么其他的解决方法吗?
> >>>
> >>> 谢谢,
> >>> 王磊
> >>
> >>
>
>

Re: 实时数据入库怎样过滤中间状态,保证最终一致

Posted by "18703416172@163.com" <18...@163.com>.
首先确定 source 事件有 eventTime ,比如 source 的返回类型为 MySource
示例代码如下:
static class MySource {
    Long ts;
    String key;
    Object object;
}
env.addSource(new SourceFunction<MySource>() {
    @Override
    public void run(SourceContext<MySource> ctx) throws Exception {
        ctx.collect(new MySource());
    }
    @Override
    public void cancel() {
    }
}).keyBy(new KeySelector<MySource, String>() {
    @Override
    public String getKey(MySource value) throws Exception {
        return value.key;
    }
}).timeWindow(Time.seconds(10)).process(new ProcessWindowFunction<MySource, Object, String, TimeWindow>() {
    @Override
    public void process(String s, Context context, Iterable<MySource> elements, Collector<Object> out) throws Exception {
        List<MySource> collect = Lists.newArrayList(elements).stream().sorted(new Comparator<MySource>() {
            @Override
            public int compare(MySource o1, MySource o2) {
                return o2.ts.compareTo(o1.ts);
            }
        }).collect(Collectors.toList());
        if (collect.size() > 0){
            out.collect(collect.get(0).object);
        }
    }
}).addSink(new SinkFunction<Object>() {
    @Override
    public void invoke(Object value, Context context) throws Exception {
        System.out.println(value);
    }
});





> 2022年3月1日 上午11:35,Lei Wang <le...@gmail.com> 写道:
> 
> 谢谢,这种是可以。
> 
> 取窗口内最新的数据怎么写合适呢,我直接这样试了下不符合预期:
> 
> env.addSource(consumer).keyBy(new KeySelector<String, String>() {
>    @Override
>    public String getKey(String value) throws Exception {
>        return value;
>    }
> }).timeWindow(Time.seconds(10)).reduce((a,b)->b).addSink()
> 
> 实际上逆序输出了窗口内的所有记录。
> 
> 谢谢,
> 
> 王磊
> 
> 
> 
> On Mon, Feb 28, 2022 at 9:59 AM 18703416172@163.com <18...@163.com>
> wrote:
> 
>> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>> 
>>> 2022年2月25日 下午6:45,Lei Wang <le...@gmail.com> 写道:
>>> 
>>> 场景描述:
>>> Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
>>> order_id   status
>>> 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
>>> 
>>> 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
>>> 最终的状态不丢,但这个最终的状态也不确定是多少。
>>> 
>>> 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
>>> 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
>>> 
>>> 请问有什么其他的解决方法吗?
>>> 
>>> 谢谢,
>>> 王磊
>> 
>>