You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Lei Wang <le...@gmail.com> on 2022/03/01 02:48:46 UTC

keyBy 后的 getKey 函数调用了两次

接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。

env.addSource(consumer).keyBy(new KeySelector<String, String>() {
    @Override
    public String getKey(String value) throws Exception {
        System.out.println(value);
        return value;
    }
}).addSink(new SinkTest(1));


我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。

为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?


谢谢,

王磊

回复: keyBy 后的 getKey 函数调用了两次

Posted by Liu Join <lz...@outlook.com>.
Reduce函数中,a可以认为是状态,你应该返回a试试,最好还是根据时间或者别的做个判断,然后输出,当然这些前提都是你的数据间隔小于10s
从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送

发件人: Lei Wang<ma...@gmail.com>
发送时间: 2022年3月1日 11:20
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: Re: keyBy 后的 getKey 函数调用了两次

谢谢,了解了。

另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:

env.addSource(consumer).keyBy(new KeySelector<String, String>() {
    @Override
    public String getKey(String value) throws Exception {
        return value;
    }
}).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。

需要用什么方式实现这个功能比较合适呢?


On Tue, Mar 1, 2022 at 10:52 AM yidan zhao <hi...@gmail.com> wrote:

>
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
>
> Lei Wang <le...@gmail.com> 于2022年3月1日周二 10:49写道:
>
> > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> >
> > env.addSource(consumer).keyBy(new KeySelector<String, String>() {
> >     @Override
> >     public String getKey(String value) throws Exception {
> >         System.out.println(value);
> >         return value;
> >     }
> > }).addSink(new SinkTest(1));
> >
> >
> > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> >
> > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> >
> >
> > 谢谢,
> >
> > 王磊
> >
>


Re: keyBy 后的 getKey 函数调用了两次

Posted by yidan zhao <hi...@gmail.com>.
不用reduce,直接用windowFunction,拿到该窗口的全部数据,返回最后一个。

而且所有的“前”和“后“应该是有数据层面的含义的,比如根据数据中的timestamp?还是啥,如果是有timestamp的话,可以在windowFunction中直接基于timestamp排序,然后输出最后一个。

Lei Wang <le...@gmail.com> 于2022年3月1日周二 11:21写道:

> 谢谢,了解了。
>
> 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:
>
> env.addSource(consumer).keyBy(new KeySelector<String, String>() {
>     @Override
>     public String getKey(String value) throws Exception {
>         return value;
>     }
> }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new
> SinkTest(1));
>
> 上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。
>
> 需要用什么方式实现这个功能比较合适呢?
>
>
> On Tue, Mar 1, 2022 at 10:52 AM yidan zhao <hi...@gmail.com> wrote:
>
> >
> >
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
> >
> > Lei Wang <le...@gmail.com> 于2022年3月1日周二 10:49写道:
> >
> > > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> > >
> > > env.addSource(consumer).keyBy(new KeySelector<String, String>() {
> > >     @Override
> > >     public String getKey(String value) throws Exception {
> > >         System.out.println(value);
> > >         return value;
> > >     }
> > > }).addSink(new SinkTest(1));
> > >
> > >
> > > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> > >
> > > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> > >
> > >
> > > 谢谢,
> > >
> > > 王磊
> > >
> >
>

Re: keyBy 后的 getKey 函数调用了两次

Posted by Lei Wang <le...@gmail.com>.
谢谢,了解了。

另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:

env.addSource(consumer).keyBy(new KeySelector<String, String>() {
    @Override
    public String getKey(String value) throws Exception {
        return value;
    }
}).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。

需要用什么方式实现这个功能比较合适呢?


On Tue, Mar 1, 2022 at 10:52 AM yidan zhao <hi...@gmail.com> wrote:

>
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
>
> Lei Wang <le...@gmail.com> 于2022年3月1日周二 10:49写道:
>
> > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> >
> > env.addSource(consumer).keyBy(new KeySelector<String, String>() {
> >     @Override
> >     public String getKey(String value) throws Exception {
> >         System.out.println(value);
> >         return value;
> >     }
> > }).addSink(new SinkTest(1));
> >
> >
> > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> >
> > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> >
> >
> > 谢谢,
> >
> > 王磊
> >
>

Re: keyBy 后的 getKey 函数调用了两次

Posted by yidan zhao <hi...@gmail.com>.
keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。

Lei Wang <le...@gmail.com> 于2022年3月1日周二 10:49写道:

> 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
>
> env.addSource(consumer).keyBy(new KeySelector<String, String>() {
>     @Override
>     public String getKey(String value) throws Exception {
>         System.out.println(value);
>         return value;
>     }
> }).addSink(new SinkTest(1));
>
>
> 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
>
> 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
>
>
> 谢谢,
>
> 王磊
>