You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anil <an...@gmail.com> on 2018/10/02 21:31:43 UTC

Flink Checkpoint Barrier in case of Join

I'm trying to understand when will Flink's Stream Barrier (for checkpoint) be
emitted by the join operator. 

Consider a query like -

select * from  stream_1 a1 INNER JOIN  stream_2 a2 on a2.orderId =
a1.orderId group by HOP(a1.proctime, INTERVAL '1' HOUR, INTERVAL '1' DAY),
a1.restaurantId

Since I'm using a Hopping window on 1 day here, Flink will have to cache my
entire 1 day events. 
The join operator will receive stream barrier from the previous operator. 
Join operator will emit one stream barrier but I'm not sure on what basis
and when will it be emitted. 

Any help will be appreciated. Thanks!


From Flink's documentation - 

```A core element in Flink’s distributed snapshotting are the stream
barriers. These barriers are injected into the data stream and flow with the
records as part of the data stream.  The point where the barriers for
snapshot n are injected (let’s call it Sn) is the position in the source
stream up to which the snapshot covers the data. The barriers then flow
downstream. When an intermediate operator has received a barrier for
snapshot n from all of its input streams, it emits a barrier for snapshot n
into all of its outgoing streams. Once a sink operator (the end of a
streaming DAG) has received the barrier n from all of its input streams, it
acknowledges that snapshot n to the checkpoint coordinator. After all sinks
have acknowledged a snapshot, it is considered completed.```







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Checkpoint Barrier in case of Join

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Anil,

The join operator behaviors same as other operators.
When a non-source task receives a barrier from one of its inputs, it blocks
that input until it receives a barrier from all inputs. When barriers have
been received from all the inputs, the task takes a snapshot of its current
state and broadcasts the barrier to its outputs. Then, the task unblocks
its input channels to continue its computation[1].

Best, Hequn
[1]
https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html#barriers

On Wed, Oct 3, 2018 at 5:31 AM Anil <an...@gmail.com> wrote:

> I'm trying to understand when will Flink's Stream Barrier (for checkpoint)
> be
> emitted by the join operator.
>
> Consider a query like -
>
> select * from  stream_1 a1 INNER JOIN  stream_2 a2 on a2.orderId =
> a1.orderId group by HOP(a1.proctime, INTERVAL '1' HOUR, INTERVAL '1' DAY),
> a1.restaurantId
>
> Since I'm using a Hopping window on 1 day here, Flink will have to cache my
> entire 1 day events.
> The join operator will receive stream barrier from the previous operator.
> Join operator will emit one stream barrier but I'm not sure on what basis
> and when will it be emitted.
>
> Any help will be appreciated. Thanks!
>
>
> From Flink's documentation -
>
> ```A core element in Flink’s distributed snapshotting are the stream
> barriers. These barriers are injected into the data stream and flow with
> the
> records as part of the data stream.  The point where the barriers for
> snapshot n are injected (let’s call it Sn) is the position in the source
> stream up to which the snapshot covers the data. The barriers then flow
> downstream. When an intermediate operator has received a barrier for
> snapshot n from all of its input streams, it emits a barrier for snapshot n
> into all of its outgoing streams. Once a sink operator (the end of a
> streaming DAG) has received the barrier n from all of its input streams, it
> acknowledges that snapshot n to the checkpoint coordinator. After all sinks
> have acknowledged a snapshot, it is considered completed.```
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>