You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王佩 <wa...@cmcm.com> on 2019/09/17 01:55:24 UTC
Split a stream into any number of streams
Hi All,
I want to split a stream into any number of streams according to a field,
and then process the split stream one by one.
Can this be achieved? What should I do?
Regards,
Pei
Re: Split a stream into any number of streams
Posted by Wesley Peng <we...@thepeng.eu>.
Hi
on 2019/9/17 10:28, 王佩 wrote:
> * // How should I do it?*
> splitStream.select("productID1").print();
If I understand for that correctly, you want somewhat the dynamic number
of Sinks?
regards
Re: Split a stream into any number of streams
Posted by 王佩 <wa...@cmcm.com>.
Thank You!
Here is a Example:
// Execution Env
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Input Source
DataStreamSource<Tuple3<String, String, String>> source =
env.fromElements(
new Tuple3<>("productID1", "click", "user_1"),
new Tuple3<>("productID1", "click", "user_2"),
new Tuple3<>("productID1", "browse", "user_1"),
new Tuple3<>("productID2", "browse", "user_1"),
new Tuple3<>("productID2", "click", "user_2"),
new Tuple3<>("productID2", "click", "user_1")
....
new Tuple3<>("productID50", "click", "user_1")
....
new Tuple3<>("productID90", "click", "user_1")
....
new Tuple3<>("productID100", "click", "user_1")
);
// Split Stream
SplitStream<Tuple3<String, String, String>> splitStream =
source.split(new OutputSelector<Tuple3<String, String, String>>() {
@Override
public Iterable<String> select(Tuple3<String, String, String>
value) {
ArrayList<String> output = new ArrayList<>();
output.add(value.f0);
return output;
}
});
// Select Stream
* // Here: I want to select products 1 to 100 and then process each
one.*
* // How should I do it?*
splitStream.select("productID1").print();
env.execute();
Regards.
Wesley Peng <we...@thepeng.eu> 于2019年9月17日周二 上午10:05写道:
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>
Re: 回复: Split a stream into any number of streams
Posted by cai yi <cy...@gmail.com>.
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...
在 2019/9/17 下午4:52,“venn”<wx...@163.com> 写入:
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")
-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <us...@flink.apache.org>
主题: Re: Split a stream into any number of streams
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。
1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
感谢!
cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:
> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to
> a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>
回复: Split a stream into any number of streams
Posted by Jun Zhang <82...@qq.com>.
对DataStream进行keyBy操作是否能解决呢?
------------------ 原始邮件 ------------------
发件人: "venn"<wxchunjhyy@163.com>;
发送时间: 2019年9月17日(星期二) 下午4:51
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 回复: Split a stream into any number of streams
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")
-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org> 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <user-zh@flink.apache.org>
主题: Re: Split a stream into any number of streams
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。
1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
感谢!
cai yi <cy.221b@gmail.com> 于2019年9月17日周二 下午1:33写道:
> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<wesley@thepeng.eu> 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to
> a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>
回复: Split a stream into any number of streams
Posted by venn <wx...@163.com>.
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
如sideoutput 需要先定义tag:
val late = new OutputTag[LateDataEvent]("late")
-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <us...@flink.apache.org>
主题: Re: Split a stream into any number of streams
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。
1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
感谢!
cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:
> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to
> a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>
Re: Split a stream into any number of streams
Posted by 王佩 <wa...@cmcm.com>.
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。
1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
感谢!
cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:
> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
>
>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a
> field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>
>
Re: Split a stream into any number of streams
Posted by cai yi <cy...@gmail.com>.
可以使用Side Output, 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
on 2019/9/17 9:55, 王佩 wrote:
> I want to split a stream into any number of streams according to a field,
> and then process the split stream one by one.
I think that should be easy done. refer to:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
regards.
Re: Split a stream into any number of streams
Posted by Wesley Peng <we...@thepeng.eu>.
on 2019/9/17 9:55, 王佩 wrote:
> I want to split a stream into any number of streams according to a field,
> and then process the split stream one by one.
I think that should be easy done. refer to:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
regards.