You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by anujk <ku...@gmail.com> on 2018/01/10 09:01:09 UTC

Datastream broadcast with KeyBy

Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
ProcessFunction.  State Management (with RocksDB) and Timers are working
well.
Now we have to extend this by having another Config Stream which we want to
broadcast to all process operators. So wanted to connect the Data Stream
with Config Stream (with Config Stream being broadcast) and use
CoProcessFunction to handle both streams.

KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
can return only one partition (Array of SelectedChannel option as in
BroadcastPartitioner is not allowed).
Would have liked this to work —
dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
but it says both stream must be keyed.

Is there any way to make this work?

dataStream.connect(confStream.broadcast()).flatMap(...
RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
processFunction functionality.

Thanks,
Anuj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Datastream broadcast with KeyBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Anuj,

connecting a keyed stream and a broadcasted stream is not supported at the
moment but there is work in progress [1] to add this functionality for the
next release (Flink 1.5.0).

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3659

2018-01-10 12:21 GMT+01:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> Could you elaborate what is the problem that you are having? What is the
> exception(s) that you are getting? I have tested such simple example and
> it’s seems to be working as expected:
>
> DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);
>
> DataStreamSource<Integer> confStream = env.fromElements(42);
>
> input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new MyCoProcessFunction()).print();
>
>
> Thanks, Piotrek
>
> On 10 Jan 2018, at 10:01, anujk <ku...@gmail.com> wrote:
>
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
>
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner
> it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…
> RichCoProcessFunction()…)
> but it says both stream must be keyed.
>
> Is there any way to make this work?
>
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy
> and
> processFunction functionality.
>
> Thanks,
> Anuj
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>

Re: Datastream broadcast with KeyBy

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Could you elaborate what is the problem that you are having? What is the exception(s) that you are getting? I have tested such simple example and it’s seems to be working as expected:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource<Integer> confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk <ku...@gmail.com> wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/