You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by James Sandys-Lumsdaine <ja...@hotmail.com> on 2021/09/06 16:02:20 UTC

Broadcast data to all keyed streams

Hello,

I have a Flink workflow which is partitioned on a key common to all the stream objects and a key that is best suited to the high volume of data I am processing. I now want to add in a new stream of prices that I want to make available to all partitioned streams - however, this new stream of prices does not have this common keyBy value.

I have tried writing a piece of code using then broadcast() method (no args) to get this new price stream to be sent to all the parallel instances on an operator. The code looks like this:

KeyedStream<RefData> keyedRefDataStream = ....;

DataStream<Price> prices = ....;
DataStream<Price> broadcastPrices = prices.broadcast();

keyedRefDataStream
    .connect(broadcastPrices)
    .process(new RefDataPriceJoiner()); // implements KeyedCoProcessFunction

I then get an error saying the broadcastPrices stream must be keyed - but I can't key it on the same key as the refData stream because it lacks this field.

I could reshuffle all my data by re-keying the ref data on a different field but this will cause a huge amount of data to be sent over the network compared with me being able to broadcast this much smaller amount of data to my keyed streams. Note I am assuming this isn't a "broadcast state" example - I assume the broadcast() method allows me to send data to all partitions.

Is any of this possible? Any pointers for me would be very helpful as I can't find answer on the web or in the documentation.

Many thanks,

James.

Re: Broadcast data to all keyed streams

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi James,

Can you elaborate why the "Broadcast State Pattern"[1] does not work for
you? I'd definitely recommend that approach.


I highly discourage this usage, but if you insist you could copy over
the ConnectedStreams#transform method and remove the check that guards
both sides of the operator are either keyed or non-keyed.


Best,

Dawid


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

On 06/09/2021 18:02, James Sandys-Lumsdaine wrote:
> Hello,
>
> I have a Flink workflow which is partitioned on a key common to all
> the stream objects and a key that is best suited to the high volume of
> data I am processing. I now want to add in a new stream of prices that
> I want to make available to all partitioned streams - however, this
> new stream of prices does not have this common keyBy value.
>
> I have tried writing a piece of code using then broadcast() method (no
> args) to get this new price stream to be sent to all the parallel
> instances on an operator. The code looks like this:
>
> KeyedStream<RefData> keyedRefDataStream = ....;
>
> DataStream<Price> prices = ....;
> DataStream<Price> broadcastPrices = prices.broadcast();
>
> keyedRefDataStream
>     .connect(broadcastPrices)
>     .process(new RefDataPriceJoiner()); // implements
> KeyedCoProcessFunction
>
> I then get an error saying the broadcastPrices stream must be keyed -
> but I can't key it on the same key as the refData stream because it
> lacks this field. 
>
> I could reshuffle all my data by re-keying the ref data on a different
> field but this will cause a huge amount of data to be sent over the
> network compared with me being able to broadcast this much smaller
> amount of data to my keyed streams. Note I am assuming this isn't a
> "broadcast state" example - I assume the broadcast() method allows me
> to send data to all partitions.
>
> Is any of this possible? Any pointers for me would be very helpful as
> I can't find answer on the web or in the documentation.
>
> Many thanks,
>
> James.

Re: Broadcast data to all keyed streams

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Your RefDataPriceJoiner should implement KeyedBroadcastProcessFunction
instead of KeyedCoProcessFunction. See the Java docs of DataStream#connect.
What's your Flink version by the way?

James Sandys-Lumsdaine <ja...@hotmail.com> 于2021年9月7日周二 上午12:02写道:

> Hello,
>
> I have a Flink workflow which is partitioned on a key common to all the
> stream objects and a key that is best suited to the high volume of data I
> am processing. I now want to add in a new stream of prices that I want to
> make available to all partitioned streams - however, this new stream of
> prices does not have this common keyBy value.
>
> I have tried writing a piece of code using then broadcast() method (no
> args) to get this new price stream to be sent to all the parallel instances
> on an operator. The code looks like this:
>
> KeyedStream<RefData> keyedRefDataStream = ....;
>
> DataStream<Price> prices = ....;
> DataStream<Price> broadcastPrices = prices.broadcast();
>
> keyedRefDataStream
>     .connect(broadcastPrices)
>     .process(new RefDataPriceJoiner()); // implements
> KeyedCoProcessFunction
>
> I then get an error saying the broadcastPrices stream must be keyed - but
> I can't key it on the same key as the refData stream because it lacks this
> field.
>
> I could reshuffle all my data by re-keying the ref data on a different
> field but this will cause a huge amount of data to be sent over the network
> compared with me being able to broadcast this much smaller amount of data
> to my keyed streams. Note I am assuming this isn't a "broadcast state"
> example - I assume the broadcast() method allows me to send data to all
> partitions.
>
> Is any of this possible? Any pointers for me would be very helpful as I
> can't find answer on the web or in the documentation.
>
> Many thanks,
>
> James.
>