You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 绘梦飘雪 <31...@qq.com> on 2020/06/10 11:18:35 UTC
关于DataStreamUtils.reinterpretasKeyedStream的使用
hi
我有这样一个场景,以多个相同的key.做keyby,
DataStream resStream = demoStream.keyBy(groupKeys)
.flatMap(new MyFlatmapFunction())
.keyBy(groupKeys)
.process(new MyProcessFunction())
.keyBy(groupKeys)
.timeWindow(Time.seconds(1))
.aggregate(new MyAggFunction())
.keyBy(groupKeys)
.timeWindow(Time.seconds(1))
.process(new MyKeyProcessFunction());
我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
int[] groupKeys = new int[]{0,2,3};
DataStream proStream = DataStreamUtils.reinterpretAsKeyedStream(demoStream, new MyKeySelector2(groupKeys) ) // MyKeySelector2 自己实现keySelector .flatMap(new MyFlatmapFunction())
我这样写发现数据流经过flatmap后并不是返回一个keyedstream
,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream
Re: 关于DataStreamUtils.reinterpretasKeyedStream的使用
Posted by Congxian Qiu <qc...@gmail.com>.
Hi
DataStreamUtils.reinterpretasKeyedStream 会返回一个 KeyedStream,但是在 KeyedStream
上再进行 flatmap 之后就变成 DataStream 了,可以按照 Jark 说的再次使用 reinterpretAsKeyedStream
得到 KeyedStream。
另外注意在 1.8 之前的版本中使用这个功能有可能会丢数据,具体可以参考这个 issue[1]
[1] https://issues.apache.org/jira/browse/FLINK-12296
Best,
Congxian
Jark Wu <im...@gmail.com> 于2020年6月10日周三 下午10:29写道:
> Hi,
>
> 你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new
> MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。
> 因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。
>
> Best,
> Jark
>
> On Wed, 10 Jun 2020 at 21:15, Yichao Yang <10...@qq.com> wrote:
>
> > Hi
> >
> >
> > flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > ------------------ 原始邮件 ------------------
> > 发件人: "绘梦飘雪"<318666977@qq.com>;
> > 发送时间: 2020年6月10日(星期三) 晚上7:18
> > 收件人: "user-zh"<user-zh@flink.apache.org>;
> >
> > 主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用
> >
> >
> >
> > hi
> > &nbsp; 我有这样一个场景,以多个相同的key.做keyby,
> > DataStream resStream =&nbsp; demoStream.keyBy(groupKeys)
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .flatMap(new MyFlatmapFunction())
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .keyBy(groupKeys)
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .process(new MyProcessFunction())
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .keyBy(groupKeys)
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .timeWindow(Time.seconds(1))
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .aggregate(new MyAggFunction())
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .keyBy(groupKeys)
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .timeWindow(Time.seconds(1))
> > &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> > .process(new MyKeyProcessFunction());
> >
> > 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
> > int[] groupKeys = new int[]{0,2,3};
> >
> > DataStream proStream =
> > DataStreamUtils.reinterpretAsKeyedStream(demoStream, new
> > MyKeySelector2(groupKeys) ) // MyKeySelector2
> >
> 自己实现keySelector
> > .flatMap(new MyFlatmapFunction())
> > 我这样写发现数据流经过flatmap后并不是返回一个keyedstream
> > ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream
>
Re: 关于DataStreamUtils.reinterpretasKeyedStream的使用
Posted by Jark Wu <im...@gmail.com>.
Hi,
你可以再用 DataStreamUtils.reinterpretAsKeyedStream(proStream, new
MyKeySelector2(groupKeys) ) 把它解释成 KeyedStream。
因为你的 flatmap 和 上游节点之间如果并发一样的话,运行时是会 chain 在一起的,所以key的分布没有变。
Best,
Jark
On Wed, 10 Jun 2020 at 21:15, Yichao Yang <10...@qq.com> wrote:
> Hi
>
>
> flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "绘梦飘雪"<318666977@qq.com>;
> 发送时间: 2020年6月10日(星期三) 晚上7:18
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用
>
>
>
> hi
> &nbsp; 我有这样一个场景,以多个相同的key.做keyby,
> DataStream resStream =&nbsp; demoStream.keyBy(groupKeys)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .flatMap(new MyFlatmapFunction())
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .keyBy(groupKeys)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .process(new MyProcessFunction())
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .keyBy(groupKeys)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .timeWindow(Time.seconds(1))
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .aggregate(new MyAggFunction())
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .keyBy(groupKeys)
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .timeWindow(Time.seconds(1))
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> .process(new MyKeyProcessFunction());
>
> 我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
> int[] groupKeys = new int[]{0,2,3};
>
> DataStream proStream =
> DataStreamUtils.reinterpretAsKeyedStream(demoStream, new
> MyKeySelector2(groupKeys) ) // MyKeySelector2
> 自己实现keySelector
> .flatMap(new MyFlatmapFunction())
> 我这样写发现数据流经过flatmap后并不是返回一个keyedstream
> ,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream
回复:关于DataStreamUtils.reinterpretasKeyedStream的使用
Posted by Yichao Yang <10...@qq.com>.
Hi
flatmap之后返回的本身就不是Keyedstream哈,keyby之后返回的才是keyedstream。
Best,
Yichao Yang
------------------ 原始邮件 ------------------
发件人: "绘梦飘雪"<318666977@qq.com>;
发送时间: 2020年6月10日(星期三) 晚上7:18
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 关于DataStreamUtils.reinterpretasKeyedStream的使用
hi
&nbsp; 我有这样一个场景,以多个相同的key.做keyby,
DataStream resStream =&nbsp; demoStream.keyBy(groupKeys)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .flatMap(new MyFlatmapFunction())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(new MyProcessFunction())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .timeWindow(Time.seconds(1))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .aggregate(new MyAggFunction())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(groupKeys)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .timeWindow(Time.seconds(1))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .process(new MyKeyProcessFunction());
我想通过DataStreamUtils.reinterpretasKeyedStream来改写这个流程,我自己尝试这样写下,,
int[] groupKeys = new int[]{0,2,3};
DataStream proStream = DataStreamUtils.reinterpretAsKeyedStream(demoStream, new MyKeySelector2(groupKeys) ) // MyKeySelector2 自己实现keySelector .flatMap(new MyFlatmapFunction())
我这样写发现数据流经过flatmap后并不是返回一个keyedstream
,请问是哪里有问题吗?请问该如何使用DataStreamUtils.reinterpretasKeyedStream