You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sush Bankapura <su...@man-es.com> on 2020/08/04 13:32:46 UTC

Re: Support for Event time clock specific to each stream in parallel streams

Hi David,

Thanks for the quick reply

You are indeed right, "Key based watermarking support" in Flink would be most useful

I gather this cannot be implemented in Flink as per this mail chain - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-td7288.html

I doubt if the situation has changed. 

Might I ask if this can be considered as a feature that could be supported in the future?

For now, it seems that we need to make do with workarounds, which we have done. However they do  introduce a few limitations 

Please find other comments inline

Thanks again,
Sush


On 2020/07/31 12:36:07, David Anderson <da...@alpinegizmo.com> wrote: 
> It sounds like you would like to have something like event-time-based
> windowing, but with independent watermarking for every key. An approach
> that can work, but it is somewhat cumbersome, is to not use watermarks or
> windows, but instead put all of the logic in a KeyedProcessFunction (or
> RichFlatMap). In this way you are free to implement your own policy for
> deciding when a given "window" for a specific key (device) is ready for
> processing, based solely on observing the events for that specific key.

[SUSH] We implemented our own custom solution using processing time semantics. However
this has the following drawbacks:
(a) High chance of multiple timers getting triggered for the same key. This leads to timer deduplication causing data to be "ignored"
(b) Undesirable behavior since not all data for the key might be available, when the window is triggered


> Semantically I think this is similar to running a separate instance of the
> job for each source, but with multi-tenancy, and with an impoverished API
> (no watermarks, no event time timers, no event time windows).
> 
> Note that it is already the case that each parallel instance of an operator
> has its own, independent notion of the current watermark.

[SUSH] We have seen all parallel instances of a specific operator having  a single  watermark across all the subtasks of that operator especially when the operator is  following a keyby operator


> I believe your
> problems arise from the fact that this current watermark is applied to all
> events processed by that instance, regardless of their keys. I believe you
> would like each key to maintain its own current watermark (event time
> clock), so if one key (device) is idle, its watermark will wait for further
> events to arrive. As it is now, events for other keys processed by the same
> operator instance (or subtask) will advance the shared watermark, causing
> an idle device's events to become late.

[SUSH] Yes. This is exactly what is needed -  key based watermark support or at least support for subtask specific watermarks. The latter could  be a possible option where there only a few keys and such an implementation does not have cause any side effects

> Regards,
> David
> 
> On Fri, Jul 31, 2020 at 1:42 PM Sush Bankapura <
> sushrutha.bankapura@man-es.com> wrote:
> 
> > Hi,
> >
> > We have a single Flink job that works on data from multiple data sources.
> > These data sources are not aligned in time and also have intermittent
> > connectivity lasting for days, due to which data will arrive late
> >
> > We attempted to use the event time and watermarks with parallel streams
> > using keyby for the data source
> >
> > In case of parallel streams, for certain operators, the event time clock
> > across all the subtasks  of the operator is the minimum value of the
> > watermark among all its input streams.
> >
> > Reference:
> > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html#watermarks
> > in-parallel-streams
> >
> > While this seems to be a fundamental concept of Flink, are there any plans
> > of having event  time clock per operator per subtask for such operators?
> >
> > This is causing us, not to use watermarks and to fallback on processing
> > time semantics or in the worst case running the same Flink job for each and
> > every different data source from which we are collecting data through Kafka
> >
> > Thanks,
> > Sush
> >
>