You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2018/10/25 23:01:24 UTC

Parallelize an incoming stream into 5 streams with the same data

Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5
streams with the same data. Each stream is keyed by different keys to do
various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the
DataStream to perform windowing/grouping operations ?

*DataStream<Long> inputStream= ...*
*DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
*DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*

*DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
windowing/grouping operations in this function*
*DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
windowing/grouping operations in this function*

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

*BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
*DataSTream out1Stream = keyBy1.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

*DataSTream out2Stream = keyBy2.connect(broadCastStream)*
* .process(new KeyedBroadcastProcessFunction...)*

Or do I need to use split:

*SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
*source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,

Re: Parallelize an incoming stream into 5 streams with the same data

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Cool, thanks! Hequn. I will try that approach.

Vijay

On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Vijay,
>
> > I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
> operation on the KeyedStream and then perform group operation on the
> resultant set to get total count etc.
>
> From your description, I think you can perform a TumblingEventTimeWindow
> first, something looks like:
>
>> // tumbling processing-time windows
>> input
>>     .keyBy(<key selector>)
>>     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>>     .<windowed transformation>(<window function>);
>
> then, you can perform a windowAll after the TumblingEventTimeWindow to get
> the final total count.
>
> Best,
> Hequn
>
>
>
> On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Thanks,Hequn.
>> If I have to do a TumblingWindow operation like:
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
>>
>> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>>
>> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream
>>
>> and then perform group operation on the resultant set to get total count etc.
>>
>> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>>
>>
>> .keyBy(*d._1,d._2*)
>> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>>
>> OR
>>
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step:
>>
>> .keyBy(*d._1,d._2*)
>> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
>> .process(new WindowProcessing(FIVE_SECONDS))
>>
>>
>> TIA,
>>
>> Vijay
>>
>>
>>
>> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <ch...@gmail.com> wrote:
>>
>>> Hi Vijay,
>>>
>>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>>> `inputStream`.
>>> While option 2 replicate all data to each task and option 3 split data
>>> into smaller groups without duplication.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bv...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>>> streams with the same data. Each stream is keyed by different keys to do
>>>> various grouping operations on the set.
>>>>
>>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>>> DataStream to perform windowing/grouping operations ?
>>>>
>>>> *DataStream<Long> inputStream= ...*
>>>> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>>> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>>
>>>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
>>>> windowing/grouping operations in this function*
>>>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
>>>> windowing/grouping operations in this function*
>>>>
>>>> out1Stream.print();
>>>> out2Stream.addSink(new Out2Sink());
>>>>
>>>> Will this work ?
>>>>
>>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>>
>>>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
>>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>>
>>>> Or do I need to use split:
>>>>
>>>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
>>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>>
>>>>
>>>> static final class MyOutputSelector implements OutputSelector<Long> {
>>>> List<String> outputs = new ArrayList<String>();
>>>> public Iterable<String> select(Long value) {
>>>> outputs.add("");
>>>> return outputs;
>>>> }
>>>> }
>>>> TIA,
>>>>
>>>

Re: Parallelize an incoming stream into 5 streams with the same data

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Vijay,

> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow
operation on the KeyedStream and then perform group operation on the
resultant set to get total count etc.

From your description, I think you can perform a TumblingEventTimeWindow
first, something looks like:

> // tumbling processing-time windows
> input
>     .keyBy(<key selector>)
>     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>     .<windowed transformation>(<window function>);

then, you can perform a windowAll after the TumblingEventTimeWindow to get
the final total count.

Best,
Hequn



On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Thanks,Hequn.
> If I have to do a TumblingWindow operation like:
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
>
> I am not able to do that on the output of keyBy(..) which is a KeyedStream.
>
> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream
>
> and then perform group operation on the resultant set to get total count etc.
>
> I am only able to do only 1 of keyBy or timeWindowAll as follows:
>
>
> .keyBy(*d._1,d._2*)
> .process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
>
> OR
>
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step:
>
> .keyBy(*d._1,d._2*)
> .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
> .process(new WindowProcessing(FIVE_SECONDS))
>
>
> TIA,
>
> Vijay
>
>
>
> On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi Vijay,
>>
>> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
>> `inputStream`.
>> While option 2 replicate all data to each task and option 3 split data
>> into smaller groups without duplication.
>>
>> Best, Hequn
>>
>> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>>> streams with the same data. Each stream is keyed by different keys to do
>>> various grouping operations on the set.
>>>
>>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>>> DataStream to perform windowing/grouping operations ?
>>>
>>> *DataStream<Long> inputStream= ...*
>>> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
>>> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>>
>>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
>>> windowing/grouping operations in this function*
>>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
>>> windowing/grouping operations in this function*
>>>
>>> out1Stream.print();
>>> out2Stream.addSink(new Out2Sink());
>>>
>>> Will this work ?
>>>
>>> Or do I use the keyBy Stream with a broadcast function like this:
>>>
>>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
>>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>>> * .process(new KeyedBroadcastProcessFunction...)*
>>>
>>> Or do I need to use split:
>>>
>>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
>>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>>
>>>
>>> static final class MyOutputSelector implements OutputSelector<Long> {
>>> List<String> outputs = new ArrayList<String>();
>>> public Iterable<String> select(Long value) {
>>> outputs.add("");
>>> return outputs;
>>> }
>>> }
>>> TIA,
>>>
>>

Re: Parallelize an incoming stream into 5 streams with the same data

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Thanks,Hequn.
If I have to do a TumblingWindow operation like:

.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))

I am not able to do that on the output of keyBy(..) which is a KeyedStream.

I was hoping to groupBy(key._1,key._2) etc and then do a
tumblingWindow operation on the KeyedStream

and then perform group operation on the resultant set to get total count etc.

I am only able to do only 1 of keyBy or timeWindowAll as follows:


.keyBy(*d._1,d._2*)
.process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))

OR

.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))


Doing this doesn't seem to be too helpful as the keyBy KeyedStream is
lost in the next step:

.keyBy(*d._1,d._2*)
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE,
TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))


TIA,

Vijay



On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Vijay,
>
> Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
> `inputStream`.
> While option 2 replicate all data to each task and option 3 split data
> into smaller groups without duplication.
>
> Best, Hequn
>
> On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi,
>> I need to broadcast/parallelize an incoming stream(inputStream) into 5
>> streams with the same data. Each stream is keyed by different keys to do
>> various grouping operations on the set.
>>
>> Do I just use inputStream.keyBy(5 diff keys) and then just use the
>> DataStream to perform windowing/grouping operations ?
>>
>> *DataStream<Long> inputStream= ...*
>> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
>> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>>
>> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
>> windowing/grouping operations in this function*
>> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
>> windowing/grouping operations in this function*
>>
>> out1Stream.print();
>> out2Stream.addSink(new Out2Sink());
>>
>> Will this work ?
>>
>> Or do I use the keyBy Stream with a broadcast function like this:
>>
>> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
>> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
>> * .process(new KeyedBroadcastProcessFunction...)*
>>
>> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
>> * .process(new KeyedBroadcastProcessFunction...)*
>>
>> Or do I need to use split:
>>
>> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
>> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
>> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>>
>>
>> static final class MyOutputSelector implements OutputSelector<Long> {
>> List<String> outputs = new ArrayList<String>();
>> public Iterable<String> select(Long value) {
>> outputs.add("");
>> return outputs;
>> }
>> }
>> TIA,
>>
>

Re: Parallelize an incoming stream into 5 streams with the same data

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in
`inputStream`.
While option 2 replicate all data to each task and option 3 split data into
smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi,
> I need to broadcast/parallelize an incoming stream(inputStream) into 5
> streams with the same data. Each stream is keyed by different keys to do
> various grouping operations on the set.
>
> Do I just use inputStream.keyBy(5 diff keys) and then just use the
> DataStream to perform windowing/grouping operations ?
>
> *DataStream<Long> inputStream= ...*
> *DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);*
> *DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);*
>
> *DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do
> windowing/grouping operations in this function*
> *DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do
> windowing/grouping operations in this function*
>
> out1Stream.print();
> out2Stream.addSink(new Out2Sink());
>
> Will this work ?
>
> Or do I use the keyBy Stream with a broadcast function like this:
>
> *BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);*
> *DataSTream out1Stream = keyBy1.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> *DataSTream out2Stream = keyBy2.connect(broadCastStream)*
> * .process(new KeyedBroadcastProcessFunction...)*
>
> Or do I need to use split:
>
> *SplitStream<Long> source = inputStream.split(new MyOutputSelector());*
> *source.select("").flatMap(new Key1Function()).addSink(out1Sink);*
> source.select("").flatMap(new Key2Function()).addSink(out2Sink);
>
>
> static final class MyOutputSelector implements OutputSelector<Long> {
> List<String> outputs = new ArrayList<String>();
> public Iterable<String> select(Long value) {
> outputs.add("");
> return outputs;
> }
> }
> TIA,
>