You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ruibin Xing <xi...@gmail.com> on 2022/02/16 11:04:07 UTC

Implement watermark buffering with Process Function

Hi,

I'm trying to implement customized state logic with KeyedProcessFunction.
But I'm not quite sure how to implement the correct watermark behavior when
late data is involved.

According to the answer on stackoverflow:
https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
, there should be a state buffering all events until watermark passed the
expected time and a event time trigger will fetch from the state and do the
calculation. The buffer type should be Map<T, List<E>> where T is the
timestamp and E is the event type.

However, the interface provided by Flink currently is only a MapStae<K, V>.
If the V is a List<E> and buffered all events, every time an event comes
Flink will do ser/deser and could be very expensive when throughput is huge.

I checked the built-in window implementation which implements the watermark
buffering.  It seems that WindowOperator consists of some InternalStates,
 of which signature is where window is namespace or key, if I understand
correctly. But internal states are not available for Flink users.

So my question is: is there an efficient way to simulate watermark
buffering using process function for Flink users?

Thanks.

Re: Implement watermark buffering with Process Function

Posted by David Anderson <da...@apache.org>.
I've done some work on this with Nico Kruber.

In our benchmarking, the performance loss (from not being able to use the
namespace) was roughly a factor of two, so it is significant. We prototyped
an API extension that addresses this particular concern but without
exposing the namespace directly, which I believe there is some reluctance
to do. I've been thinking of turning this into a FLIP, but it needs more
work first.

Another direction that could be explored would be to use finer-grained
timestamps. E.g., with nanosecond-precision timestamps the number of
colliding events will be dramatically smaller.

David

On Wed, Feb 16, 2022 at 10:17 PM David Anderson <da...@apache.org>
wrote:

> I'm afraid not. The DataStream window implementation uses internal APIs to
> manipulate the state backend namespace, which isn't possible to do with the
> public-facing API. And without this, you can't implement this as
> efficiently.
>
> David
>
> On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing <xi...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm trying to implement customized state logic with KeyedProcessFunction.
>> But I'm not quite sure how to implement the correct watermark behavior when
>> late data is involved.
>>
>> According to the answer on stackoverflow:
>> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
>> , there should be a state buffering all events until watermark passed the
>> expected time and a event time trigger will fetch from the state and do the
>> calculation. The buffer type should be Map<T, List<E>> where T is the
>> timestamp and E is the event type.
>>
>> However, the interface provided by Flink currently is only a MapStae<K,
>> V>. If the V is a List<E> and buffered all events, every time an event
>> comes Flink will do ser/deser and could be very expensive when throughput
>> is huge.
>>
>> I checked the built-in window implementation which implements the
>> watermark buffering.  It seems that WindowOperator consists of some
>> InternalStates,  of which signature is where window is namespace or key, if
>> I understand correctly. But internal states are not available for Flink
>> users.
>>
>> So my question is: is there an efficient way to simulate watermark
>> buffering using process function for Flink users?
>>
>> Thanks.
>>
>

Re: Implement watermark buffering with Process Function

Posted by David Anderson <da...@apache.org>.
I'm afraid not. The DataStream window implementation uses internal APIs to
manipulate the state backend namespace, which isn't possible to do with the
public-facing API. And without this, you can't implement this as
efficiently.

David

On Wed, Feb 16, 2022 at 12:04 PM Ruibin Xing <xi...@gmail.com> wrote:

> Hi,
>
> I'm trying to implement customized state logic with KeyedProcessFunction.
> But I'm not quite sure how to implement the correct watermark behavior when
> late data is involved.
>
> According to the answer on stackoverflow:
> https://stackoverflow.com/questions/59468154/how-to-sort-an-out-of-order-event-time-stream-using-flink
> , there should be a state buffering all events until watermark passed the
> expected time and a event time trigger will fetch from the state and do the
> calculation. The buffer type should be Map<T, List<E>> where T is the
> timestamp and E is the event type.
>
> However, the interface provided by Flink currently is only a MapStae<K,
> V>. If the V is a List<E> and buffered all events, every time an event
> comes Flink will do ser/deser and could be very expensive when throughput
> is huge.
>
> I checked the built-in window implementation which implements the
> watermark buffering.  It seems that WindowOperator consists of some
> InternalStates,  of which signature is where window is namespace or key, if
> I understand correctly. But internal states are not available for Flink
> users.
>
> So my question is: is there an efficient way to simulate watermark
> buffering using process function for Flink users?
>
> Thanks.
>