You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ge...@swisscom.com on 2021/09/10 06:56:32 UTC
Questions regarding broadcast join in Flink
Hello, I am trying to implement a broadcast join of two streams in flink using the broadcast functionality. In my usecase I have a large stream that will be enriched with a much smaller stream. In order to first test my approach, I have adapted the Taxi ride exercise in the official training repository (this one: https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala ), where the two streams are joined using .connect()
Instead, I have adapted my code as follows:
//The main function has been abbreviated for ease of reading
def main(){
//Main stream
val rides = env
.addSource(rideSourceOrTest(new TaxiRideGenerator()))
.filter { ride => ride.isStart }
// .keyBy { ride => ride.rideId }
//Small stream
val fares = env
.addSource(fareSourceOrTest(new TaxiFareGenerator()))
val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
val faresBroadcast: BroadcastStream[TaxiFare] = fares
.broadcast(broadcastStateDescriptor)
val result: DataStream[(TaxiRide,TaxiFare)] = rides
.connect(faresBroadcast)
.process(new BroadcastJoin())
}
class BroadcastJoin() extends BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]{//IN1, IN2, OUT。 That is, non broadcast stream type, broadcast stream type and output stream type
//Broadcast state descriptor
private lazy val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
//Process the broadcast stream element, value is the broadcast stream element passed in, and the modifiable broadcast state can be obtained through CTX
override def processBroadcastElement(value: TaxiFare, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context, out: Collector[(TaxiRide,TaxiFare)]): Unit = {
val broadcast_status: BroadcastState[Long,TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
broadcast_status.put ( value.rideId , value) // add the broadcast stream element to the broadcast state, which will be saved in local memory
}
//Handle non broadcast stream elements. Value is the non broadcast stream element passed in. Only read-only broadcast status can be obtained through CTX
override def processElement(value: TaxiRide, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext, out: Collector[(TaxiRide,TaxiFare)]): Unit = {
//Read broadcast status
val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
if(broadcast_status.contains(value.rideId)) {
val foundMatch = broadcast_status.get(value.rideId)
out.collect((value, foundMatch)) //Send out the desired results
}
}
}
I have limited the TaxiFare generator to only produce 20 samples. This approach seems to work, but I am not always getting 20 joined samples (both generators output samples starting with id=1 and increase by one). I did some investigating and what I believe is happening is this: In the case a sample is broadcasted to at least one of the nodes (I have 4) before the corresponding sample from the main stream is processed, then everything is fine and these 2 records will be joined. However if it happens that a record from the main sample is processed before the corresponding record from the small stream is broadcasted to at least one of the 4 nodes, this join never happens, as when the processElement() function is called, the lookup on the broadcast_status map will not find anything with that ride_id.
There is clearly something wrong with this approach. If anyone has any idea of what I am doing wrong, I would very much appreciate any advice.
Thank you,
Gerald
Re: Questions regarding broadcast join in Flink
Posted by Timo Walther <tw...@apache.org>.
Hi Gerald,
actually, this is a typical issue when performing a streaming join.
An ideal solution would be to block the main stream until the broadcast
stream is ready. However, this is currently not supported in the API.
In any case, a user needs to handle this in a use case specific way to
declare when the broadcast state is "ready".
A solution could be to buffer events of the main stream until some
condition is met. The condition could be having at least 1 matching
record in the broadcast state or a timeout condition.
You could also think about modifying the broadcast stream record with an
additional marker flag like `<messageType=[PAYLOAD|COMPLETENESS_MARKER],
payload>`. And only start the processing when receiving the completeness
marker in the broadcast stream.
I hope this helps.
Regards,
Timo
On 10.09.21 08:56, Gerald.Sula@swisscom.com wrote:
> Hello, I am trying to implement a broadcast join of two streams in flink
> using the broadcast functionality. In my usecase I have a large stream
> that will be enriched with a much smaller stream. In order to first test
> my approach, I have adapted the Taxi ride exercise in the official
> training repository (this one:
> https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
> <https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala>
> ), where the two streams are joined using .connect()
>
> Instead, I have adapted my code as follows:
>
> //The main function has been abbreviated for ease of reading
>
> def main(){
>
> //Main stream
>
> val rides = env
>
> .addSource(rideSourceOrTest(new TaxiRideGenerator()))
>
> .filter { ride => ride.isStart }
>
> // .keyBy { ride => ride.rideId }
>
> //Small stream
>
> val fares = env
>
> .addSource(fareSourceOrTest(new TaxiFareGenerator()))
>
> val broadcastStateDescriptor = new
> MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
>
> val faresBroadcast: BroadcastStream[TaxiFare] = fares
>
> .broadcast(broadcastStateDescriptor)
>
> val result: DataStream[(TaxiRide,TaxiFare)] = rides
>
> .connect(faresBroadcast)
>
> .process(new BroadcastJoin())
>
> }
>
> class BroadcastJoin() extends
> BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]{//IN1,
> IN2, OUT。That is, non broadcast stream type, broadcast stream type and
> output stream type
>
> //Broadcast state descriptor
>
> private lazy val broadcastStateDescriptor = new
> MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
>
> //Process the broadcast stream element, value is the broadcast
> stream element passed in, and the modifiable broadcast state can be
> obtained through CTX
>
> override def processBroadcastElement(value: TaxiFare, ctx:
> BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context,
> out: Collector[(TaxiRide,TaxiFare)]): Unit = {
>
> val broadcast_status: BroadcastState[Long,TaxiFare] =
> ctx.getBroadcastState(broadcastStateDescriptor)
>
> broadcast_status.put ( value.rideId , value) // add the broadcast
> stream element to the broadcast state, which will be saved in local memory
>
> }
>
> //Handle non broadcast stream elements. Value is the non broadcast
> stream element passed in. Only read-only broadcast status can be
> obtained through CTX
>
> override def processElement(value: TaxiRide, ctx:
> BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext,
> out: Collector[(TaxiRide,TaxiFare)]): Unit = {
>
> //Read broadcast status
>
> val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] =
> ctx.getBroadcastState(broadcastStateDescriptor)
>
> if(broadcast_status.contains(value.rideId)) {
>
> val foundMatch = broadcast_status.get(value.rideId)
>
> out.collect((value, foundMatch)) //Send out the desired results
>
> }
>
> }
>
> }
>
> I have limited the TaxiFare generator to only produce 20 samples. This
> approach seems to work, but I am not always getting 20 joined samples
> (both generators output samples starting with id=1 and increase by one).
> I did some investigating and what I believe is happening is this: In the
> case a sample is broadcasted to at least one of the nodes (I have 4)
> before the corresponding sample from the main stream is processed, then
> everything is fine and these 2 records will be joined. However if it
> happens that a record from the main sample is processed before the
> corresponding record from the small stream is broadcasted to at least
> one of the 4 nodes, this join never happens, as when the
> processElement() function is called, the lookup on the broadcast_status
> map will not find anything with that ride_id.
>
> There is clearly something wrong with this approach. If anyone has any
> idea of what I am doing wrong, I would very much appreciate any advice.
>
> Thank you,
>
> Gerald
>