You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 王佩 <wa...@cmcm.com> on 2019/09/17 01:55:24 UTC

Split a stream into any number of streams

Hi All,

I want to split a stream into any number of streams according to a field,
and then process the split stream one by one.

Can this be achieved? What should I do?

Regards,
Pei

Re: Split a stream into any number of streams

Posted by Wesley Peng <we...@thepeng.eu>.
Hi

on 2019/9/17 10:28, 王佩 wrote:
> *      //  How should I do it?*
> splitStream.select("productID1").print();

If I understand for that correctly, you want somewhat the dynamic number 
of Sinks?

regards

Re: Split a stream into any number of streams

Posted by 王佩 <wa...@cmcm.com>.
Thank You!
Here is a Example:
      // Execution Env
      StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

      // Input Source
      DataStreamSource<Tuple3<String, String, String>> source =
env.fromElements(
              new Tuple3<>("productID1", "click", "user_1"),
              new Tuple3<>("productID1", "click", "user_2"),
              new Tuple3<>("productID1", "browse", "user_1"),
              new Tuple3<>("productID2", "browse", "user_1"),
              new Tuple3<>("productID2", "click", "user_2"),
              new Tuple3<>("productID2", "click", "user_1")
              ....
              new Tuple3<>("productID50", "click", "user_1")
              ....
              new Tuple3<>("productID90", "click", "user_1")
              ....
              new Tuple3<>("productID100", "click", "user_1")

      );

      // Split Stream
      SplitStream<Tuple3<String, String, String>> splitStream =
source.split(new OutputSelector<Tuple3<String, String, String>>() {
          @Override
          public Iterable<String> select(Tuple3<String, String, String>
value) {

              ArrayList<String> output = new ArrayList<>();
              output.add(value.f0);
              return output;

          }
      });

      // Select Stream
     * // Here: I want to select products 1 to 100 and then process each
one.*

*      //  How should I do it?*
splitStream.select("productID1").print();

      env.execute();

Regards.

Wesley Peng <we...@thepeng.eu> 于2019年9月17日周二 上午10:05写道:

>
>
> on 2019/9/17 9:55, 王佩 wrote:
> > I want to split a stream into any number of streams according to a field,
> > and then process the split stream one by one.
>
> I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
> regards.
>

Re: 回复: Split a stream into any number of streams

Posted by cai yi <cy...@gmail.com>.
你这个场景貌似可以用Broadcast来广播自定义的事件规则然后join数据流, 之后可以在process中进行处理...

在 2019/9/17 下午4:52,“venn”<wx...@163.com> 写入:

    恐怕不行,sideoutput 和 split 都需要先知道要分多少个流
    
    如sideoutput 需要先定义tag:
    	val late = new OutputTag[LateDataEvent]("late")
    
    
    -----邮件原件-----
    发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> 代表 王佩
    发送时间: Tuesday, September 17, 2019 4:25 PM
    收件人: user-zh <us...@flink.apache.org>
    主题: Re: Split a stream into any number of streams
    
    是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。
    
    Flink
    从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
    Sink 到Parquet。
    
    1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
    2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。
    
    感谢!
    
    cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:
    
    > 可以使用Side Output,
    > 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
    >
    > 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
    >
    >
    >
    >     on 2019/9/17 9:55, 王佩 wrote:
    >     > I want to split a stream into any number of streams according to 
    > a field,
    >     > and then process the split stream one by one.
    >
    >     I think that should be easy done. refer to:
    >
    > https://stackoverflow.com/questions/53588554/apache-flink-using-filter
    > -or-split-to-split-a-stream
    >
    >     regards.
    >
    >
    

回复: Split a stream into any number of streams

Posted by Jun Zhang <82...@qq.com>.
对DataStream进行keyBy操作是否能解决呢?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"venn"<wxchunjhyy@163.com&gt;;
发送时间:&nbsp;2019年9月17日(星期二) 下午4:51
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复: Split a stream into any number of streams



恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
	val late = new OutputTag[LateDataEvent]("late")


-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org&gt; 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <cy.221b@gmail.com&gt; 于2019年9月17日周二 下午1:33写道:

&gt; 可以使用Side Output,
&gt; 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
&gt;
&gt; 在 2019/9/17 上午10:05,“Wesley Peng”<wesley@thepeng.eu&gt; 写入:
&gt;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; on 2019/9/17 9:55, 王佩 wrote:
&gt;&nbsp;&nbsp;&nbsp;&nbsp; &gt; I want to split a stream into any number of streams according to 
&gt; a field,
&gt;&nbsp;&nbsp;&nbsp;&nbsp; &gt; and then process the split stream one by one.
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; I think that should be easy done. refer to:
&gt;
&gt; https://stackoverflow.com/questions/53588554/apache-flink-using-filter
&gt; -or-split-to-split-a-stream
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; regards.
&gt;
&gt;

回复: Split a stream into any number of streams

Posted by venn <wx...@163.com>.
恐怕不行,sideoutput 和 split 都需要先知道要分多少个流

如sideoutput 需要先定义tag:
	val late = new OutputTag[LateDataEvent]("late")


-----邮件原件-----
发件人: user-zh-return-1164-wxchunjhyy=163.com@flink.apache.org <us...@flink.apache.org> 代表 王佩
发送时间: Tuesday, September 17, 2019 4:25 PM
收件人: user-zh <us...@flink.apache.org>
主题: Re: Split a stream into any number of streams

是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
>
>
>
>     on 2019/9/17 9:55, 王佩 wrote:
>     > I want to split a stream into any number of streams according to 
> a field,
>     > and then process the split stream one by one.
>
>     I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
>     regards.
>
>

Re: Split a stream into any number of streams

Posted by 王佩 <wa...@cmcm.com>.
是这样的。比方有1000个事件(通过某个字段区分,事件会继续增加),都在一个kafka topic中。

Flink
从Kafka读取数据后是一个DataStream,我想将每个事件分流出来,这样每个事件都是一个DataStream,后续,可以对每个事件做各种各样的处理,如DataStream异步IO、DataStream
Sink 到Parquet。

1、如果用split...select,由于select(事件名),这里的事件名必须是某个确定的。
2、如果用side output,要提前定义output tag,我有1000个事件(事件会继续增加),这样就需要定义1000+ output tag。

感谢!

cai yi <cy...@gmail.com> 于2019年9月17日周二 下午1:33写道:

> 可以使用Side Output,
> 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!
>
> 在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:
>
>
>
>     on 2019/9/17 9:55, 王佩 wrote:
>     > I want to split a stream into any number of streams according to a
> field,
>     > and then process the split stream one by one.
>
>     I think that should be easy done. refer to:
>
> https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
>
>     regards.
>
>

Re: Split a stream into any number of streams

Posted by cai yi <cy...@gmail.com>.
可以使用Side Output, 将输入流根据不同需求发送到自定义的不同的OutputTag中,最后可以使用DataStream.getSideOutput(outputTag)取出你需要的流进行处理!

在 2019/9/17 上午10:05,“Wesley Peng”<we...@thepeng.eu> 写入:

    
    
    on 2019/9/17 9:55, 王佩 wrote:
    > I want to split a stream into any number of streams according to a field,
    > and then process the split stream one by one.
    
    I think that should be easy done. refer to:
    https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
    
    regards.
    

Re: Split a stream into any number of streams

Posted by Wesley Peng <we...@thepeng.eu>.

on 2019/9/17 9:55, 王佩 wrote:
> I want to split a stream into any number of streams according to a field,
> and then process the split stream one by one.

I think that should be easy done. refer to:
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream

regards.