You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Dawid Wysakowicz <dw...@apache.org> on 2022/12/02 13:20:35 UTC

Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

Ad. 1

I'd start with ProcessingTimerService as that's the only public 
interface. It is exposed in the Sink V2 interface. In this scenario it 
would be the Sink interface that need to extend from a EOFTimersHandler. 
I believe it would be hard to pass it from there to the 
ProcessingTimeService as it is passed from the outside e.g. in the 
ProcessingTimeServiceAware. For that reason I'd go with a registration 
method in that interface.

In ProcessFunction I'd go with a mixin approach, so a ProcessFunction 
can extend from EOFTimersHandler. I'd do that because ProcessFunction 
does not have an init/open method where we could register the handler.

On operator level I'd have a registration method in 
InternalTimerService. I believe that's the only way to handle the above 
ProcessFunction aproach. E.g. in KeyedProcessOperator you need to check 
if the UDF extend from the interface not the operator itself.

Ad. 2

I'd go with

*(Keyed)ProcessFunction:*

interface EOFTimersHandler {

  void handleProcessingTimer(long timestamp, Context);

}

interface Context {
         public abstract <X> void output(OutputTag<X> outputTag, X value);

         public abstract K getCurrentKey();

// we can extend it for waitFor later

}

*ProcessingTimeService: *

interface EOFTimersHandler {

  void handleProcessingTimer(long timestamp, Context);

}

interface Context {

// we can extend it for waitFor later

}

*InternalTimeService:*

interface EOFTimersHandler {

  void handleProcessingTimer(InternalTimer<K,N> timer Context);

}

interface Context {

// we can extend it for waitFor later

}

Personally I'd not try to unify those places too much. They have also 
different visibilities (public/internal), have access to different set 
of metadata (key/namespace).


Ad 3.

I don't like the having the trigger/cancel methods, because:

1. I don't like the back and forth between system and UDF

2. Yes, the biggest issue I have is with the possibility with 
registering new timers. I am trying to be on the safe side here. I don't 
like the idea of dropping them, because it is again making assumptions 
what users do with those timers. What if they e.g. emit counter if it 
reached certain threshold? We'd need an additional flag in the method 
that is the final timer. My sentiment is that we're making it 
questionably easier to trigger a timer for the cost of openning up for 
unforeseen problems with follow up registration.

Best,

Dawid

On 30/11/2022 12:13, Yun Gao wrote:
> Hi Dawid, Piotr
> Very thanks for the discussion!
> As a whole I think we are already consistent with the callback option, and I don't
> think I opposed that we could modify the current internal implementation. But from
> my side it is still not clear what the actual interfaces are proposing. Let me first try
> to summarize that a bit:
> 1) Which object does the handlers register on?
> It seems there are two options, one is to timer services (InternalTimerService
> / ProcessingTimerService or some equivalent things after refactoring), the other
> one is as a lifecycle of the operator. I'm now tending to the latter one, how do
> you think on this part?
> 2) What is the interface of the handler?
> Option 1 is that
> interface SomeHandlerName {
>   void processingTimer(Timer timer);
> }
> class Timer {
>   long getTimestamp();
>   void trigger();
>   void cancel();
>   // Other actions if required.
> }
> But it seems there is controversy on whether to add actions to the timer class.
> If without that, with my understanding the interfaces of the Option 2 are
> interface SomeHandlerName {
>   void processTimer(Timer timer);
> }
> interface KeyedSomeHandlerName<KEY, NAMESPACE> {
>   void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx);
> }
> class Timer {
>   long getTimestamp();
> }
> class KeyedTimer<KEY, NAMESPACE> extends Timer {
>   KEY getKey();
>   NAMESPACE getNamespace();
> }
> void Context {
> void executeAtScheduledTime(Consumer<timer> handler);
> }
> As Piotr has pointed out, if we could eliminate the logic of namespace, we could then
> remove the namespace related type parameter and method from the interfaces.
> Do I understand right?
> Besides, I'm still fully got the reason that why we should not add the actions to the
> timer class, in consideration that it seems in most cases users could implement their
> logical with simply calling timer.trigger() (I think the repeat registration is indeed a
> problem, but I think we could ignore the timers registered during termination).
> Could you further enlighten me a bit on this part?
> Best,
> Yun Gao
> ------------------------------------------------------------------
> From:Piotr Nowojski<pn...@apache.org>
> Send Time:2022 Nov. 30 (Wed.) 17:10
> To:dev<de...@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination
> Hi,
> I have a couple of remarks.
> First a general one. For me the important part in the design of this API is
> how to expose this to Flink users in public interfaces. Namely
> ProcessFunction and StreamOperator. InternalTimerService is an internal
> class, so we can change it and break it as needed in the future.
> For registering a handler like proposed by Dawid:
> interface SomeHandlerName {
>   void onTimer(/* whatever type it is */ timer, Context ctx ) {
>   }
> }
> makes sense to me. For the InternalTimerService I think it doesn't matter
> too much what we do. We could provide a similar interface as for the
> ProcessFunction/StreamOperator, it doesn't have to be the same one. On the
> contrary, I think it shouldn't be the same, as part of this effort we
> shouldn't be exposing the concept of `Namespaces` to the public facing API.
> Re the "waitFor". Theoretically I see arguments why users might want to use
> this, but I'm also not convinced whether that's necessary in practice. I
> would be +1 either way. First version can be without this functionality and
> we can add it later (given that we designed a good place to add it in the
> future, like the `Context` proposed by Dawid). But I'm also fine adding it
> now if others are insisting.
> Best,
> Piotrek
> śr., 30 lis 2022 o 09:18 Dawid Wysakowicz<dw...@apache.org>
> napisał(a):
>> WindowOperator is not implemented by users. I can see that for
>> InternalTimerService we'll need
>>
>> interface PendingTimerProcessor<KEY, NAMESPACE> {
>> void onTimer(InternalTimer<KEY, NAMESPACE> timer) {
>> doHandleTimer(timer);
>> }
>>
>> I don't see a problem with that.
>>
>> As you said ProcessingTimeService is a user facing interface and
>> completely unrelated to the InternalTimerService. I don't see a reason
>> why we'd need to unify those.
>>
>> As for the waitFor behaviour. Personally, I have not been convinced it
>> is necessary. Maybe it's just my lack of vision, but I can't think of a
>> scenario where I'd use it. Still if we need it, I'd go for something like:
>>
>> void onTimer(/* whatever type it is */ timer, Context ctx ) {
>>
>> }
>>
>> interface Context {
>> void executeAtScheduledTime(Consumer<timer> handler);
>> }
>>
>>
>> That way you have independent simple interfaces that need to work only
>> in a single well defined scenario and you don't need to match an
>> interface to multiple different cases.
>>
>> Best,
>> Dawid
>>
>> On 30/11/2022 07:27, Yun Gao wrote:
>>> Hi Dawid,
>>> Thanks for the comments!
>>> As a whole I'm also open to the API and I also prefer to use simple
>>> but flexible interfaces, but it still looks there are some problem to
>>> just let users to implement the termination actions.
>>> Let's take the WindowOperator as an example. As seen in [1],
>>> in the timer processing logic it needs to acquire the key / namespace
>>> information bound to the timer (which is only supported by the
>> InternalTimerService).
>>> Thus if we want users to implement the same logic on termination, we
>> either let users
>>> to trigger the timer handler directly or we also allows users to access
>> these piece of
>>> information. If we go with the later direction, we might need to provide
>> interfaces like
>>> interface PendingTimerProcessor<KEY, NAMESPACE> {
>>> void onTimer(Timer<KEY, NAMESPACE> timer) {
>>> doHandleTimer(timer);
>>> }
>>> }
>>> class Timer<KEY, NAMESPACE> {
>>> long getTimestamp();
>>> KEY getKey();
>>> NAMESPACE getNamespace();
>>> }
>>> Then we'll have the issue that since we need the interface to handle
>> both of cases of
>>> InternalTimerSerivce and raw ProcessTimeService, the later do not have
>> key and
>>> namespace information attached, and its also be a bit inconsistency for
>> users to have to set
>>> the KEY and NAMESPACE types.
>>> Besides, it looks to me that if we want to implement behaviors like
>> waiting for, it might
>>> be not simply reuse the time handler time, then it requires every
>> operator authors to
>>> re-implement such waiting logics.
>>>> Moreover it still have the downside that if you call back to the
>> `onTimer` method after
>>>> `trigger` you have access to the Context which lets you register new
>> timers.
>>> I think we could simply drop the timers registered during we start
>> processing the pending timers
>>> on termination. Logically there should be no new data after termination.
>>>> I think I am not convinced to these arguments. First of all I'm afraid
>> there is no clear distinction
>>>> in that area what is runtime and what is not. I always found
>> `AbstracStreamOperator(*)` actually part
>>>> of runtime or Flink's internals and thus I don't find
>> `InternalTimerService` a utility, but a vital part
>>>> of the system. Let's be honest it is impossible to implement an
>> operator without extending from
>>>> `AbstractStreamOperator*`.What would be the problem with having a
>> proper implementation in
>>>> `InternalTimerService`? Can't we do it like this?:
>>> I think the original paragraph is only explanation to that the interface
>> is harder to support if we
>>> allows the users to implement the arbitrary logic. But since now we are
>> at the page with the callback
>>> option, users could always be allowed to implement arbitrary logic no
>> matter we support timer.trigger()
>>> or not, thus I think now there is no divergence on this point. I also
>> believe in we'll finally have some logic
>>> similar to the proposed one that drain all the times and process it.
>>> Best,
>>> Yun Gao
>>> [1]
>> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488  <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 
>> >
>> <
>> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488  <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 
>> >
>>> ------------------------------------------------------------------
>>> From:Dawid Wysakowicz<dw...@apache.org>
>>> Send Time:2022 Nov. 28 (Mon.) 23:33
>>> To:dev<de...@flink.apache.org>
>>> Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
>> on Job Termination
>>> Do we really need to have separate methods for
>> triggering/waiting/cancelling. To me it sounds rather counterintuitive. Why
>> can't users just execute whatever they want in the handler itself instead
>> of additional back and forth with the system? Moreover it still have the
>> downside that if you call back to the `onTimer` method after `trigger` you
>> have access to the Context which lets you register new timers.
>>> I find following approach much simpler:
>>> void onTimer(...) {
>>> doHandleTimer(timestamp);
>>> }
>>> void processPendingTimer(...) {
>>> // trigger
>>> doHandleTimer(timestamp);
>>> // for cancel, simply do nothing...
>>> }
>>> Sorry I might not make it very clear here. I think the difficulty with
>> supported setting the currentKey is a special issue for the callback
>> options (no matter what the interface is) since it allows users to execute
>> logic other than the one registered with the timers. The complexity comes
>> from that currently we have two level of TimerServices: The
>> ProcessingTimerService (there is no key) and InternalTimerService (with
>> key). Currently only ProcessingTimerService is exposed to the runtime and
>> InternalTimerService is much more a utility to implement the operator. Then
>> with the current code, the runtime could only access to
>> ProcessingTimerService on termination.
>>> I think I am not convinced to these arguments. First of all I'm afraid
>> there is no clear distinction in that area what is runtime and what is not.
>> I always found `AbstracStreamOperator(*)` actually part of runtime or
>> Flink's internals and thus I don't find `InternalTimerService` a utility,
>> but a vital part of the system. Let's be honest it is impossible to
>> implement an operator without extending from `AbstractStreamOperator*`.
>>> What would be the problem with having a proper implementation in
>> `InternalTimerService`? Can't we do it like this?:
>>> AbstractStreamOperator#finish() {
>>> internalTimerService.finish();
>>> }
>>> InternalTimerService#finish() {
>>> while ((timer = processingTimeTimersQueue.peek()) != null) {
>>> keyContext.setCurrentKey(timer.getKey());
>>> processingTimeTimersQueue.poll();
>>> onEndOfInputHandler.processPendingTimer(timer);
>>> }
>>> }
>>> If we only executes some predefined actions, we do not need to worry
>> about the implementation of InternalTimerService and just execute the
>> registered timers. But if we allow users to execute arbitrary logic, we
>> need to be also aware of the InternalTimerServices and parse the key from
>> the timers stored in it. I think we should always have method to overcome
>> this issue, but to support the callback options would be more complex.
>>> I am not sure, having "predefined actions" would be good enough that we
>> do not need to set a key. As a user I'd anyhow expect the proper key to be
>> set in processPendingTimer.
>>> Best,
>>> Dawid
>>> On 24/11/2022 08:51, Yun Gao wrote:
>>> Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we
>> have reached the consensus on the high-level API: Most operators should
>> usually have only one reasonable action to the pending timers on
>> termination, thus we could let the operators to implement its own actions
>> with the low-level interface provided. The only exception is the
>> ProcessFunction, with which users might register customized timers, thus
>> users might also defines the actions on termination (If I have
>> misunderstandings here, please correct me). For the low-level API, I could
>> get the benefits with the callback options: since in most cases an operator
>> has only one action to all the timers, its a waste for us to store the same
>> flag for all the timers, also with a lot of code / state format changes.
>> But since it is enough for most users to simply trigger / cacnel the
>> timers, it would be redundant for users to implement the logic twice. Thus
>> perhaps we might combine the benefits of the two options: We might have a
>> separate interface public interface TimerHandlersOnTermination { void
>> processPendingTimer(Timer timer, long currentTime); } public class Timer {
>> long getRegisteredTimestamp(); void trigger(); void waitFor(); void
>> cancel(); } Then if an operator have implemented the
>> TimerHandlersOnTermination interface, on termination we could call
>> processPendingTimer(xx) for every pending timers. Users might simply
>> trigger / waitFor / cancel it, or execute some other logics if needed. Then
>> for the ProcessFunction we might have a similar interface to
>> processPendingTimer, except we might need to provide Context / Collector to
>> the ProcessFunction. Do you think this would be a good direction? Also
>> @Piotr I don't see a problem here. Interface doesn't have to reflect that,
>> only the runtime must set the correct key context before executing the
>> handler dealing with the processing time timers at the end of input/time.
>> Sorry I might not make it very clear here. I think the difficulty with
>> supported setting the currentKey is a special issue for the callback
>> options (no matter what the interface is) since it allows users to execute
>> logic other than the one registered with the timers. The complexity comes
>> from that currently we have two level of TimerServices: The
>> ProcessingTimerService (there is no key) and InternalTimerService (with
>> key). Currently only ProcessingTimerService is exposed to the runtime and
>> InternalTimerService is much more a utility to implement the operator. Then
>> with the current code, the runtime could only access to
>> ProcessingTimerService on termination. If we only executes some predefined
>> actions, we do not need to worry about the implementation of
>> InternalTimerService and just execute the registered timers. But if we
>> allow users to execute arbitrary logic, we need to be also aware of the
>> InternalTimerServices and parse the key from the timers stored in it. I
>> think we should always have method to overcome this issue, but to support
>> the callback options would be more complex. Best, Yun Gao
>> ------------------------------------------------------------------
>> From:Divye Kapoor<dk...@pinterest.com.INVALID>  <mailto: dkapoor@pinterest.com.INVALID >  Send Time:2022 Nov. 24 (Thu.) 08:50
>> To:dev<de...@flink.apache.org>  <mailto:dev@flink.apache.org >  Cc:Xenon
>> Development Team<xe...@pinterest.com>  <mailto:xenon-dev@pinterest.com
>>> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
>> Timers on Job Termination Sounds good. Looks like we're on the same page.
>> Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <
>> pnowojski@apache.org> <mailto:pnowojski@apache.org >  wrote: Hi Divye I
>> think we are mostly on the same page. Just to clarify/rephrase: One thing
>> to think about - on EOF “trigger immediately” will mean that the
>> asynchronous wait timeout timers will also fire - which is undesirable I
>> didn't mean to fire all timers immediately in all of the built-in
>> operators. Just that each built-in operator can have a hard coded way
>> (without a way for users to change it) to handle those timers. Windowed
>> operators would trigger the lingering timers (flush outputs),
>> AsyncWaitOperator could just ignore them. The same way users could register
>> EOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we
>> (as flink developers) could use the same mechanism to implement any
>> behaviour we want for the built-in operators. There should be no need to
>> add any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 Divye
>> Kapoor<dk...@pinterest.com.invalid>  <mailto: dkapoor@pinterest.com.invalid >  napisał(a): Thanks Yun/Piotrek, Some
>> brief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski
>> <pn...@apache.org>  <mailto:pnowojski@apache.org >  wrote: Hi, All in
>> all I would agree with Dawid's proposal. +1 We can add the flexibility of
>> how to deal with the timers in the low level API via adding a handler - if
>> someone needs to customize it, he will always have a workaround. Note after
>> giving it more thought, I agree that registering some handlers is better
>> than overloading the register timer method and modifying the timer's state.
>> +1. At the same time, we can force the most sensible semantic that we think
>> for the couple of built-in operators, which should be pretty
>> straightforward (either ignore the timers, or fire them at once). I agree
>> there might be some edge cases, that theoretically user might want to wait
>> for the timer to fire naturally, but: 1. I'm not sure how common in
>> practice this will be. If not at all, then why should we be complicating
>> the API/system? That’s fair. However, the specifics are very important
>> here. One thing to think about - on EOF “trigger immediately” will mean
>> that the asynchronous wait timeout timers will also fire - which is
>> undesirable (because they are racing with the last async call). However,
>> the issue is cleanly resolved by waiting for the timer to be canceled when
>> the last event is processed. (“Wait for” case). Ignoring the timer has the
>> least justification. Registering the handler as per Dawid’s proposal and
>> having that handler unregister the timers on EOF makes best sense. This
>> solution also unifies the trigger immediately case as that handler can
>> reregister the timers for early termination. The proposal: 1. Operator
>> receives EOF 2. EOF timer handler triggers 3. EOF handler adjusts the
>> registered timers for early trigger or ignore. If wait-for behavior is
>> desired, timers are not changed. This is controlled in client code. 4.
>> Operator waits for all timers to drain/trigger. (“Always”). There is no
>> special handling for ignore/early trigger. 5. Operator allows job to
>> proceed with shutdown. The only api change needed is an EOF handler. The
>> other agreement we need is that “Wait for” is the desired behavior in
>> processing time and that processing time is fundamentally different from
>> event time in this respect. (I have changed my thinking since the last
>> mail). 2. We can always expand the API in the future, and let the user
>> override the default built-in behaviour of the operators via some setter on
>> the stream transformation (`SingleOutputStreamOperator`), or via some
>> custom API DSL style in each of the operators separately. This is not
>> required. See above. Re forcing the same semantics for processing time
>> timers as for event time ones - this is tempting, but indeed I see a
>> possibility that users need to adhere to some external constraints when
>> using processing time. +1. As above, we should consider the 2 cases
>> fundamentally different in this area. Re: Yun - b) Another issue is that
>> what if users use timers with different termination actions in the same
>> operator / UDF? For example, users use some kind of timeout (like throws
>> exception if some thing not happen after some other thing), and also some
>> kind of window aggregation logic. In this case, without additional tags,
>> users might not be able to distinguish which timer should be canceled and
>> which time should be triggered ? as above. The EOF handler makes the
>> choice. 4. How could these scenarios adjust their APIs ? From the current
>> listed scenarios, I'm more tend to that as @Dawid pointed out, there might
>> be only one expected behavior for each scenario, thus it does not seems to
>> need to allow users to adjust the behavior. Thus @Divye may I have a double
>> confirmation currently do we have explicit scenarios that is expected to
>> change the different behaviors for the same scenario? Wait-for behavior is
>> probably the only expected behavior and any alterations should be from the
>> EOF handler managing the registered timers. Besides @Divye from the listed
>> scenarios, I have another concern for global configuration is that for one
>> job, different operators seems to still might have different expected
>> behaviors. For example, A job using both Window operator and
>> AsyncWaitOperator might have different requirements for timers on
>> termination? Thank you for raising this case. This changed my thinking.
>> Based on your point, we should try and align on the “Wait-for” with EOF
>> handler proposal. I’m withdrawing the “single-runtime-config” proposal.
>> Best, Divye
>>

Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

Posted by Yun Tang <ta...@apache.org>.
Sorry to pick up this discussion again after a long time.
@Yun Gao, could you share the PoC if possible?

Best
Yun Tang

On 2022/12/08 10:20:21 Piotr Nowojski wrote:
> Sounds good to me as well!
> 
> Best,
> Piotrek
> 
> czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dw...@apache.org>
> napisał(a):
> 
> > Sounds like a good plan to me.
> > On 08/12/2022 08:58, Yun Gao wrote:
> >
> > Hi Dawid,
> >
> > Very thanks for the discussion and sorry for the delayed response
> > since I was hesitated on some points.
> >
> > But as a whole, with some more thought, first I agree with that adding
> > the trigger() / cancle() methods to some kind of timer object is not
> > necessary
> > for us to achieve the exactly-once for the operators. We could follow the
> > direction of "modifying the implementation of the operators" to achieve the
> > same target.
> >
> > But continue to think with this direction, it now looks to me it is also
> > not
> > needed to add the callback to the timer services:
> > 1. For InternalTimerService, the operators could just call
> > `InternalTimerService
> > #forEachProcessingTimer()` on finish to handle the pending timers.
> > 2. For the timers registered to the underlying ProcessingTimerService, at
> > least in
> > the currently listed scenarios, the operators itself knows what is the
> > remaining work
> > (e.g., the FileWriter knows if it has in-progress file to flush).
> >
> > Operators could handle the remaining timers in finish() method.
> >
> > Then the only interface we need to consider is that added to the
> > ProcessFunction. The
> > current interface also looks ok to me.
> >
> > If you think the above option works, I could first have a PoC that
> > demonstrate it is sufficient
> > to only modify the operator implementation to handling the remaining
> > workers properly on
> > finish(). If there are new issues I'll post here and we could have some
> > more discussion.
> >
> > Best,
> > Yun Gao
> >
> >
> > ------------------Original Mail ------------------
> > *Sender:*Dawid Wysakowicz <dw...@apache.org>
> > <dw...@apache.org>
> > *Send Date:*Fri Dec 2 21:21:25 2022
> > *Recipients:*Dev <de...@flink.apache.org> <de...@flink.apache.org>
> > *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
> > on Job Termination
> >
> >> Ad. 1
> >>
> >> I'd start with ProcessingTimerService as that's the only public
> >> interface. It is exposed in the Sink V2 interface. In this scenario it
> >> would be the Sink interface that need to extend from a EOFTimersHandler. I
> >> believe it would be hard to pass it from there to the ProcessingTimeService
> >> as it is passed from the outside e.g. in the ProcessingTimeServiceAware.
> >> For that reason I'd go with a registration method in that interface.
> >>
> >> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can
> >> extend from EOFTimersHandler. I'd do that because ProcessFunction does not
> >> have an init/open method where we could register the handler.
> >>
> >> On operator level I'd have a registration method in InternalTimerService.
> >> I believe that's the only way to handle the above ProcessFunction aproach.
> >> E.g. in KeyedProcessOperator you need to check if the UDF extend from the
> >> interface not the operator itself.
> >>
> >> Ad. 2
> >>
> >> I'd go with
> >>
> >> *(Keyed)ProcessFunction:*
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(long timestamp, Context);
> >>
> >> }
> >>
> >> interface Context {
> >>         public abstract <X> void output(OutputTag<X> outputTag, X value);
> >>
> >>         public abstract K getCurrentKey();
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> *ProcessingTimeService: *
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(long timestamp, Context);
> >>
> >> }
> >>
> >> interface Context {
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> *InternalTimeService:*
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(InternalTimer<K,N> timer Context);
> >>
> >> }
> >>
> >> interface Context {
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> Personally I'd not try to unify those places too much. They have also
> >> different visibilities (public/internal), have access to different set of
> >> metadata (key/namespace).
> >>
> >>
> >> Ad 3.
> >>
> >> I don't like the having the trigger/cancel methods, because:
> >>
> >> 1. I don't like the back and forth between system and UDF
> >>
> >> 2. Yes, the biggest issue I have is with the possibility with registering
> >> new timers. I am trying to be on the safe side here. I don't like the idea
> >> of dropping them, because it is again making assumptions what users do with
> >> those timers. What if they e.g. emit counter if it reached certain
> >> threshold? We'd need an additional flag in the method that is the final
> >> timer. My sentiment is that we're making it questionably easier to trigger
> >> a timer for the cost of openning up for unforeseen problems with follow up
> >> registration.
> >>
> >> Best,
> >>
> >> Dawid
> >> On 30/11/2022 12:13, Yun Gao wrote:
> >>
> >> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are already consistent with the callback option, and I don't think I opposed that we could modify the current internal implementation. But from my side it is still not clear what the actual interfaces are proposing. Let me first try to summarize that a bit:1) Which object does the handlers register on?It seems there are two options, one is to timer services (InternalTimerService/ ProcessingTimerService or some equivalent things after refactoring), the otherone is as a lifecycle of the operator. I'm now tending to the latter one, how do you think on this part?2) What is the interface of the handler?Option 1 is that interface SomeHandlerName { void processingTimer(Timer timer);}class Timer { long getTimestamp(); void trigger(); void cancel(); // Other actions if required. }But it seems there is controversy on whether to add actions to the timer class. If without that, with my understanding the interfaces of the Option 2 areinterface SomeHandlerName { void processTimer(Timer timer); }interface KeyedSomeHandlerName<KEY, NAMESPACE> { void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, if we could eliminate the logic of namespace, we could thenremove the namespace related type parameter and method from the interfaces.Do I understand right?Besides, I'm still fully got the reason that why we should not add the actions to the timer class, in consideration that it seems in most cases users could implement their logical with simply calling timer.trigger() (I think the repeat registration is indeed a problem, but I think we could ignore the timers registered during termination). Could you further enlighten me a bit on this part?Best,Yun Gao------------------------------------------------------------------From:Piotr Nowojski <pn...@apache.org> <pn...@apache.org>Send Time:2022 Nov. 30 (Wed.) 17:10To:dev <de...@flink.apache.org> <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job TerminationHi,I have a couple of remarks.First a general one. For me the important part in the design of this API ishow to expose this to Flink users in public interfaces. NamelyProcessFunction and StreamOperator. InternalTimerService is an internalclass, so we can change it and break it as needed in the future.For registering a handler like proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context ctx ) { }}makes sense to me. For the InternalTimerService I think it doesn't mattertoo much what we do. We could provide a similar interface as for theProcessFunction/StreamOperator, it doesn't have to be the same one. On thecontrary, I think it shouldn't be the same, as part of this effort weshouldn't be exposing the concept of `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see arguments why users might want to usethis, but I'm also not convinced whether that's necessary in practice. Iwould be +1 either way. First version can be without this functionality andwe can add it later (given that we designed a good place to add it in thefuture, like the `Context` proposed by Dawid). But I'm also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>napisał(a):
> >>
> >> WindowOperator is not implemented by users. I can see that forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}I don't see a problem with that.As you said ProcessingTimeService is a user facing interface andcompletely unrelated to the InternalTimerService. I don't see a reasonwhy we'd need to unify those.As for the waitFor behaviour. Personally, I have not been convinced itis necessary. Maybe it's just my lack of vision, but I can't think of ascenario where I'd use it. Still if we need it, I'd go for something like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you have independent simple interfaces that need to work onlyin a single well defined scenario and you don't need to match aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote:
> >>
> >> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I also prefer to use simplebut flexible interfaces, but it still looks there are some problem tojust let users to implement the termination actions.Let's take the WindowOperator as an example. As seen in [1],in the timer processing logic it needs to acquire the key / namespaceinformation bound to the timer (which is only supported by the
> >>
> >> InternalTimerService).
> >>
> >> Thus if we want users to implement the same logic on termination, we
> >>
> >> either let users
> >>
> >> to trigger the timer handler directly or we also allows users to access
> >>
> >> these piece of
> >>
> >> information. If we go with the later direction, we might need to provide
> >>
> >> interfaces like
> >>
> >> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the issue that since we need the interface to handle
> >>
> >> both of cases of
> >>
> >> InternalTimerSerivce and raw ProcessTimeService, the later do not have
> >>
> >> key and
> >>
> >> namespace information attached, and its also be a bit inconsistency for
> >>
> >> users to have to set
> >>
> >> the KEY and NAMESPACE types.Besides, it looks to me that if we want to implement behaviors like
> >>
> >> waiting for, it might
> >>
> >> be not simply reuse the time handler time, then it requires every
> >>
> >> operator authors to
> >>
> >> re-implement such waiting logics.
> >>
> >> Moreover it still have the downside that if you call back to the
> >>
> >> `onTimer` method after
> >>
> >> `trigger` you have access to the Context which lets you register new
> >>
> >> timers.
> >>
> >> I think we could simply drop the timers registered during we start
> >>
> >> processing the pending timers
> >>
> >> on termination. Logically there should be no new data after termination.
> >>
> >> I think I am not convinced to these arguments. First of all I'm afraid
> >>
> >> there is no clear distinction
> >>
> >> in that area what is runtime and what is not. I always found
> >>
> >> `AbstracStreamOperator(*)` actually part
> >>
> >> of runtime or Flink's internals and thus I don't find
> >>
> >> `InternalTimerService` a utility, but a vital part
> >>
> >> of the system. Let's be honest it is impossible to implement an
> >>
> >> operator without extending from
> >>
> >> `AbstractStreamOperator*`.What would be the problem with having a
> >>
> >> proper implementation in
> >>
> >> `InternalTimerService`? Can't we do it like this?:
> >>
> >> I think the original paragraph is only explanation to that the interface
> >>
> >> is harder to support if we
> >>
> >> allows the users to implement the arbitrary logic. But since now we are
> >>
> >> at the page with the callback
> >>
> >> option, users could always be allowed to implement arbitrary logic no
> >>
> >> matter we support timer.trigger()
> >>
> >> or not, thus I think now there is no divergence on this point. I also
> >>
> >> believe in we'll finally have some logic
> >>
> >> similar to the proposed one that drain all the times and process it.Best,Yun Gao[1]
> >>
> >> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488>
> >>
> >> ------------------------------------------------------------------From:Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>Send Time:2022 Nov. 28 (Mon.) 23:33To:dev <de...@flink.apache.org> <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
> >>
> >> on Job Termination
> >>
> >> Do we really need to have separate methods for
> >>
> >> triggering/waiting/cancelling. To me it sounds rather counterintuitive. Whycan't users just execute whatever they want in the handler itself insteadof additional back and forth with the system? Moreover it still have thedownside that if you call back to the `onTimer` method after `trigger` youhave access to the Context which lets you register new timers.
> >>
> >> I find following approach much simpler:void onTimer(...) {doHandleTimer(timestamp);}void processPendingTimer(...) {// triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I might not make it very clear here. I think the difficulty with
> >>
> >> supported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination.
> >>
> >> I think I am not convinced to these arguments. First of all I'm afraid
> >>
> >> there is no clear distinction in that area what is runtime and what is not.I always found `AbstracStreamOperator(*)` actually part of runtime orFlink's internals and thus I don't find `InternalTimerService` a utility,but a vital part of the system. Let's be honest it is impossible toimplement an operator without extending from `AbstractStreamOperator*`.
> >>
> >> What would be the problem with having a proper implementation in
> >>
> >> `InternalTimerService`? Can't we do it like this?:
> >>
> >> AbstractStreamOperator#finish() {internalTimerService.finish();}InternalTimerService#finish() {while ((timer = processingTimeTimersQueue.peek()) != null) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If we only executes some predefined actions, we do not need to worry
> >>
> >> about the implementation of InternalTimerService and just execute theregistered timers. But if we allow users to execute arbitrary logic, weneed to be also aware of the InternalTimerServices and parse the key fromthe timers stored in it. I think we should always have method to overcomethis issue, but to support the callback options would be more complex.
> >>
> >> I am not sure, having "predefined actions" would be good enough that we
> >>
> >> do not need to set a key. As a user I'd anyhow expect the proper key to beset in processPendingTimer.
> >>
> >> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we
> >>
> >> have reached the consensus on the high-level API: Most operators shouldusually have only one reasonable action to the pending timers ontermination, thus we could let the operators to implement its own actionswith the low-level interface provided. The only exception is theProcessFunction, with which users might register customized timers, thususers might also defines the actions on termination (If I havemisunderstandings here, please correct me). For the low-level API, I couldget the benefits with the callback options: since in most cases an operatorhas only one action to all the timers, its a waste for us to store the sameflag for all the timers, also with a lot of code / state format changes.But since it is enough for most users to simply trigger / cacnel thetimers, it would be redundant for users to implement the logic twice. Thusperhaps we might combine the benefits of the two options: We might have aseparate interface public interface TimerHandlersOnTermination { voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); voidcancel(); } Then if an operator have implemented theTimerHandlersOnTermination interface, on termination we could callprocessPendingTimer(xx) for every pending timers. Users might simplytrigger / waitFor / cancel it, or execute some other logics if needed. Thenfor the ProcessFunction we might have a similar interface toprocessPendingTimer, except we might need to provide Context / Collector tothe ProcessFunction. Do you think this would be a good direction? Also@Piotr I don't see a problem here. Interface doesn't have to reflect that,only the runtime must set the correct key context before executing thehandler dealing with the processing time timers at the end of input/time.Sorry I might not make it very clear here. I think the difficulty withsupported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination. If we only executes some predefinedactions, we do not need to worry about the implementation ofInternalTimerService and just execute the registered timers. But if weallow users to execute arbitrary logic, we need to be also aware of theInternalTimerServices and parse the key from the timers stored in it. Ithink we should always have method to overcome this issue, but to supportthe callback options would be more complex. Best, Yun Gao------------------------------------------------------------------From:Divye Kapoor <dk...@pinterest.com.INVALID> <dk...@pinterest.com.INVALID> <mailto:dkapoor@pinterest.com.INVALID > <dk...@pinterest.com.INVALID> Send Time:2022 Nov. 24 (Thu.) 08:50To:dev <de...@flink.apache.org> <de...@flink.apache.org> <mailto:dev@flink.apache.org > <de...@flink.apache.org> Cc:XenonDevelopment Team <xe...@pinterest.com> <xe...@pinterest.com> <mailto:xenon-dev@pinterest.com <xe...@pinterest.com>
> >>
> >> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
> >>
> >> Timers on Job Termination Sounds good. Looks like we're on the same page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <pn...@apache.org> <mailto:pnowojski@apache.org > <pn...@apache.org> wrote: Hi Divye Ithink we are mostly on the same page. Just to clarify/rephrase: One thingto think about - on EOF “trigger immediately” will mean that theasynchronous wait timeout timers will also fire - which is undesirable Ididn't mean to fire all timers immediately in all of the built-inoperators. Just that each built-in operator can have a hard coded way(without a way for users to change it) to handle those timers. Windowedoperators would trigger the lingering timers (flush outputs),AsyncWaitOperator could just ignore them. The same way users could registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we(as flink developers) could use the same mechanism to implement anybehaviour we want for the built-in operators. There should be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 DivyeKapoor <dk...@pinterest.com.invalid> <dk...@pinterest.com.invalid> <mailto:dkapoor@pinterest.com.invalid > <dk...@pinterest.com.invalid> napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski<pn...@apache.org> <pn...@apache.org> <mailto:pnowojski@apache.org > <pn...@apache.org> wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the flexibility ofhow to deal with the timers in the low level API via adding a handler - ifsomeone needs to customize it, he will always have a workaround. Note aftergiving it more thought, I agree that registering some handlers is betterthan overloading the register timer method and modifying the timer's state.+1. At the same time, we can force the most sensible semantic that we thinkfor the couple of built-in operators, which should be prettystraightforward (either ignore the timers, or fire them at once). I agreethere might be some edge cases, that theoretically user might want to waitfor the timer to fire naturally, but: 1. I'm not sure how common inpractice this will be. If not at all, then why should we be complicatingthe API/system? That’s fair. However, the specifics are very importanthere. One thing to think about - on EOF “trigger immediately” will meanthat the asynchronous wait timeout timers will also fire - which isundesirable (because they are racing with the last async call). However,the issue is cleanly resolved by waiting for the timer to be canceled whenthe last event is processed. (“Wait for” case). Ignoring the timer has theleast justification. Registering the handler as per Dawid’s proposal andhaving that handler unregister the timers on EOF makes best sense. Thissolution also unifies the trigger immediately case as that handler canreregister the timers for early termination. The proposal: 1. Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts theregistered timers for early trigger or ignore. If wait-for behavior isdesired, timers are not changed. This is controlled in client code. 4.Operator waits for all timers to drain/trigger. (“Always”). There is nospecial handling for ignore/early trigger. 5. Operator allows job toproceed with shutdown. The only api change needed is an EOF handler. Theother agreement we need is that “Wait for” is the desired behavior inprocessing time and that processing time is fundamentally different fromevent time in this respect. (I have changed my thinking since the lastmail). 2. We can always expand the API in the future, and let the useroverride the default built-in behaviour of the operators via some setter onthe stream transformation (`SingleOutputStreamOperator`), or via somecustom API DSL style in each of the operators separately. This is notrequired. See above. Re forcing the same semantics for processing timetimers as for event time ones - this is tempting, but indeed I see apossibility that users need to adhere to some external constraints whenusing processing time. +1. As above, we should consider the 2 casesfundamentally different in this area. Re: Yun - b) Another issue is thatwhat if users use timers with different termination actions in the sameoperator / UDF? For example, users use some kind of timeout (like throwsexception if some thing not happen after some other thing), and also somekind of window aggregation logic. In this case, without additional tags,users might not be able to distinguish which timer should be canceled andwhich time should be triggered ? as above. The EOF handler makes thechoice. 4. How could these scenarios adjust their APIs ? From the currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there mightbe only one expected behavior for each scenario, thus it does not seems toneed to allow users to adjust the behavior. Thus @Divye may I have a doubleconfirmation currently do we have explicit scenarios that is expected tochange the different behaviors for the same scenario? Wait-for behavior isprobably the only expected behavior and any alterations should be from theEOF handler managing the registered timers. Besides @Divye from the listedscenarios, I have another concern for global configuration is that for onejob, different operators seems to still might have different expectedbehaviors. For example, A job using both Window operator andAsyncWaitOperator might have different requirements for timers ontermination? Thank you for raising this case. This changed my thinking.Based on your point, we should try and align on the “Wait-for” with EOFhandler proposal. I’m withdrawing the “single-runtime-config” proposal.Best, Divye
> >>
> >>
> 

Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

Posted by Piotr Nowojski <pn...@apache.org>.
Sounds good to me as well!

Best,
Piotrek

czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dw...@apache.org>
napisał(a):

> Sounds like a good plan to me.
> On 08/12/2022 08:58, Yun Gao wrote:
>
> Hi Dawid,
>
> Very thanks for the discussion and sorry for the delayed response
> since I was hesitated on some points.
>
> But as a whole, with some more thought, first I agree with that adding
> the trigger() / cancle() methods to some kind of timer object is not
> necessary
> for us to achieve the exactly-once for the operators. We could follow the
> direction of "modifying the implementation of the operators" to achieve the
> same target.
>
> But continue to think with this direction, it now looks to me it is also
> not
> needed to add the callback to the timer services:
> 1. For InternalTimerService, the operators could just call
> `InternalTimerService
> #forEachProcessingTimer()` on finish to handle the pending timers.
> 2. For the timers registered to the underlying ProcessingTimerService, at
> least in
> the currently listed scenarios, the operators itself knows what is the
> remaining work
> (e.g., the FileWriter knows if it has in-progress file to flush).
>
> Operators could handle the remaining timers in finish() method.
>
> Then the only interface we need to consider is that added to the
> ProcessFunction. The
> current interface also looks ok to me.
>
> If you think the above option works, I could first have a PoC that
> demonstrate it is sufficient
> to only modify the operator implementation to handling the remaining
> workers properly on
> finish(). If there are new issues I'll post here and we could have some
> more discussion.
>
> Best,
> Yun Gao
>
>
> ------------------Original Mail ------------------
> *Sender:*Dawid Wysakowicz <dw...@apache.org>
> <dw...@apache.org>
> *Send Date:*Fri Dec 2 21:21:25 2022
> *Recipients:*Dev <de...@flink.apache.org> <de...@flink.apache.org>
> *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
> on Job Termination
>
>> Ad. 1
>>
>> I'd start with ProcessingTimerService as that's the only public
>> interface. It is exposed in the Sink V2 interface. In this scenario it
>> would be the Sink interface that need to extend from a EOFTimersHandler. I
>> believe it would be hard to pass it from there to the ProcessingTimeService
>> as it is passed from the outside e.g. in the ProcessingTimeServiceAware.
>> For that reason I'd go with a registration method in that interface.
>>
>> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can
>> extend from EOFTimersHandler. I'd do that because ProcessFunction does not
>> have an init/open method where we could register the handler.
>>
>> On operator level I'd have a registration method in InternalTimerService.
>> I believe that's the only way to handle the above ProcessFunction aproach.
>> E.g. in KeyedProcessOperator you need to check if the UDF extend from the
>> interface not the operator itself.
>>
>> Ad. 2
>>
>> I'd go with
>>
>> *(Keyed)ProcessFunction:*
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(long timestamp, Context);
>>
>> }
>>
>> interface Context {
>>         public abstract <X> void output(OutputTag<X> outputTag, X value);
>>
>>         public abstract K getCurrentKey();
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> *ProcessingTimeService: *
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(long timestamp, Context);
>>
>> }
>>
>> interface Context {
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> *InternalTimeService:*
>>
>> interface EOFTimersHandler {
>>
>>  void handleProcessingTimer(InternalTimer<K,N> timer Context);
>>
>> }
>>
>> interface Context {
>>
>> // we can extend it for waitFor later
>>
>> }
>>
>> Personally I'd not try to unify those places too much. They have also
>> different visibilities (public/internal), have access to different set of
>> metadata (key/namespace).
>>
>>
>> Ad 3.
>>
>> I don't like the having the trigger/cancel methods, because:
>>
>> 1. I don't like the back and forth between system and UDF
>>
>> 2. Yes, the biggest issue I have is with the possibility with registering
>> new timers. I am trying to be on the safe side here. I don't like the idea
>> of dropping them, because it is again making assumptions what users do with
>> those timers. What if they e.g. emit counter if it reached certain
>> threshold? We'd need an additional flag in the method that is the final
>> timer. My sentiment is that we're making it questionably easier to trigger
>> a timer for the cost of openning up for unforeseen problems with follow up
>> registration.
>>
>> Best,
>>
>> Dawid
>> On 30/11/2022 12:13, Yun Gao wrote:
>>
>> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are already consistent with the callback option, and I don't think I opposed that we could modify the current internal implementation. But from my side it is still not clear what the actual interfaces are proposing. Let me first try to summarize that a bit:1) Which object does the handlers register on?It seems there are two options, one is to timer services (InternalTimerService/ ProcessingTimerService or some equivalent things after refactoring), the otherone is as a lifecycle of the operator. I'm now tending to the latter one, how do you think on this part?2) What is the interface of the handler?Option 1 is that interface SomeHandlerName { void processingTimer(Timer timer);}class Timer { long getTimestamp(); void trigger(); void cancel(); // Other actions if required. }But it seems there is controversy on whether to add actions to the timer class. If without that, with my understanding the interfaces of the Option 2 areinterface SomeHandlerName { void processTimer(Timer timer); }interface KeyedSomeHandlerName<KEY, NAMESPACE> { void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, if we could eliminate the logic of namespace, we could thenremove the namespace related type parameter and method from the interfaces.Do I understand right?Besides, I'm still fully got the reason that why we should not add the actions to the timer class, in consideration that it seems in most cases users could implement their logical with simply calling timer.trigger() (I think the repeat registration is indeed a problem, but I think we could ignore the timers registered during termination). Could you further enlighten me a bit on this part?Best,Yun Gao------------------------------------------------------------------From:Piotr Nowojski <pn...@apache.org> <pn...@apache.org>Send Time:2022 Nov. 30 (Wed.) 17:10To:dev <de...@flink.apache.org> <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job TerminationHi,I have a couple of remarks.First a general one. For me the important part in the design of this API ishow to expose this to Flink users in public interfaces. NamelyProcessFunction and StreamOperator. InternalTimerService is an internalclass, so we can change it and break it as needed in the future.For registering a handler like proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context ctx ) { }}makes sense to me. For the InternalTimerService I think it doesn't mattertoo much what we do. We could provide a similar interface as for theProcessFunction/StreamOperator, it doesn't have to be the same one. On thecontrary, I think it shouldn't be the same, as part of this effort weshouldn't be exposing the concept of `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see arguments why users might want to usethis, but I'm also not convinced whether that's necessary in practice. Iwould be +1 either way. First version can be without this functionality andwe can add it later (given that we designed a good place to add it in thefuture, like the `Context` proposed by Dawid). But I'm also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>napisał(a):
>>
>> WindowOperator is not implemented by users. I can see that forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}I don't see a problem with that.As you said ProcessingTimeService is a user facing interface andcompletely unrelated to the InternalTimerService. I don't see a reasonwhy we'd need to unify those.As for the waitFor behaviour. Personally, I have not been convinced itis necessary. Maybe it's just my lack of vision, but I can't think of ascenario where I'd use it. Still if we need it, I'd go for something like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you have independent simple interfaces that need to work onlyin a single well defined scenario and you don't need to match aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote:
>>
>> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I also prefer to use simplebut flexible interfaces, but it still looks there are some problem tojust let users to implement the termination actions.Let's take the WindowOperator as an example. As seen in [1],in the timer processing logic it needs to acquire the key / namespaceinformation bound to the timer (which is only supported by the
>>
>> InternalTimerService).
>>
>> Thus if we want users to implement the same logic on termination, we
>>
>> either let users
>>
>> to trigger the timer handler directly or we also allows users to access
>>
>> these piece of
>>
>> information. If we go with the later direction, we might need to provide
>>
>> interfaces like
>>
>> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the issue that since we need the interface to handle
>>
>> both of cases of
>>
>> InternalTimerSerivce and raw ProcessTimeService, the later do not have
>>
>> key and
>>
>> namespace information attached, and its also be a bit inconsistency for
>>
>> users to have to set
>>
>> the KEY and NAMESPACE types.Besides, it looks to me that if we want to implement behaviors like
>>
>> waiting for, it might
>>
>> be not simply reuse the time handler time, then it requires every
>>
>> operator authors to
>>
>> re-implement such waiting logics.
>>
>> Moreover it still have the downside that if you call back to the
>>
>> `onTimer` method after
>>
>> `trigger` you have access to the Context which lets you register new
>>
>> timers.
>>
>> I think we could simply drop the timers registered during we start
>>
>> processing the pending timers
>>
>> on termination. Logically there should be no new data after termination.
>>
>> I think I am not convinced to these arguments. First of all I'm afraid
>>
>> there is no clear distinction
>>
>> in that area what is runtime and what is not. I always found
>>
>> `AbstracStreamOperator(*)` actually part
>>
>> of runtime or Flink's internals and thus I don't find
>>
>> `InternalTimerService` a utility, but a vital part
>>
>> of the system. Let's be honest it is impossible to implement an
>>
>> operator without extending from
>>
>> `AbstractStreamOperator*`.What would be the problem with having a
>>
>> proper implementation in
>>
>> `InternalTimerService`? Can't we do it like this?:
>>
>> I think the original paragraph is only explanation to that the interface
>>
>> is harder to support if we
>>
>> allows the users to implement the arbitrary logic. But since now we are
>>
>> at the page with the callback
>>
>> option, users could always be allowed to implement arbitrary logic no
>>
>> matter we support timer.trigger()
>>
>> or not, thus I think now there is no divergence on this point. I also
>>
>> believe in we'll finally have some logic
>>
>> similar to the proposed one that drain all the times and process it.Best,Yun Gao[1]
>>
>> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488>
>>
>> ------------------------------------------------------------------From:Dawid Wysakowicz <dw...@apache.org> <dw...@apache.org>Send Time:2022 Nov. 28 (Mon.) 23:33To:dev <de...@flink.apache.org> <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
>>
>> on Job Termination
>>
>> Do we really need to have separate methods for
>>
>> triggering/waiting/cancelling. To me it sounds rather counterintuitive. Whycan't users just execute whatever they want in the handler itself insteadof additional back and forth with the system? Moreover it still have thedownside that if you call back to the `onTimer` method after `trigger` youhave access to the Context which lets you register new timers.
>>
>> I find following approach much simpler:void onTimer(...) {doHandleTimer(timestamp);}void processPendingTimer(...) {// triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I might not make it very clear here. I think the difficulty with
>>
>> supported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination.
>>
>> I think I am not convinced to these arguments. First of all I'm afraid
>>
>> there is no clear distinction in that area what is runtime and what is not.I always found `AbstracStreamOperator(*)` actually part of runtime orFlink's internals and thus I don't find `InternalTimerService` a utility,but a vital part of the system. Let's be honest it is impossible toimplement an operator without extending from `AbstractStreamOperator*`.
>>
>> What would be the problem with having a proper implementation in
>>
>> `InternalTimerService`? Can't we do it like this?:
>>
>> AbstractStreamOperator#finish() {internalTimerService.finish();}InternalTimerService#finish() {while ((timer = processingTimeTimersQueue.peek()) != null) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If we only executes some predefined actions, we do not need to worry
>>
>> about the implementation of InternalTimerService and just execute theregistered timers. But if we allow users to execute arbitrary logic, weneed to be also aware of the InternalTimerServices and parse the key fromthe timers stored in it. I think we should always have method to overcomethis issue, but to support the callback options would be more complex.
>>
>> I am not sure, having "predefined actions" would be good enough that we
>>
>> do not need to set a key. As a user I'd anyhow expect the proper key to beset in processPendingTimer.
>>
>> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we
>>
>> have reached the consensus on the high-level API: Most operators shouldusually have only one reasonable action to the pending timers ontermination, thus we could let the operators to implement its own actionswith the low-level interface provided. The only exception is theProcessFunction, with which users might register customized timers, thususers might also defines the actions on termination (If I havemisunderstandings here, please correct me). For the low-level API, I couldget the benefits with the callback options: since in most cases an operatorhas only one action to all the timers, its a waste for us to store the sameflag for all the timers, also with a lot of code / state format changes.But since it is enough for most users to simply trigger / cacnel thetimers, it would be redundant for users to implement the logic twice. Thusperhaps we might combine the benefits of the two options: We might have aseparate interface public interface TimerHandlersOnTermination { voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); voidcancel(); } Then if an operator have implemented theTimerHandlersOnTermination interface, on termination we could callprocessPendingTimer(xx) for every pending timers. Users might simplytrigger / waitFor / cancel it, or execute some other logics if needed. Thenfor the ProcessFunction we might have a similar interface toprocessPendingTimer, except we might need to provide Context / Collector tothe ProcessFunction. Do you think this would be a good direction? Also@Piotr I don't see a problem here. Interface doesn't have to reflect that,only the runtime must set the correct key context before executing thehandler dealing with the processing time timers at the end of input/time.Sorry I might not make it very clear here. I think the difficulty withsupported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination. If we only executes some predefinedactions, we do not need to worry about the implementation ofInternalTimerService and just execute the registered timers. But if weallow users to execute arbitrary logic, we need to be also aware of theInternalTimerServices and parse the key from the timers stored in it. Ithink we should always have method to overcome this issue, but to supportthe callback options would be more complex. Best, Yun Gao------------------------------------------------------------------From:Divye Kapoor <dk...@pinterest.com.INVALID> <dk...@pinterest.com.INVALID> <mailto:dkapoor@pinterest.com.INVALID > <dk...@pinterest.com.INVALID> Send Time:2022 Nov. 24 (Thu.) 08:50To:dev <de...@flink.apache.org> <de...@flink.apache.org> <mailto:dev@flink.apache.org > <de...@flink.apache.org> Cc:XenonDevelopment Team <xe...@pinterest.com> <xe...@pinterest.com> <mailto:xenon-dev@pinterest.com <xe...@pinterest.com>
>>
>> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
>>
>> Timers on Job Termination Sounds good. Looks like we're on the same page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <pn...@apache.org> <mailto:pnowojski@apache.org > <pn...@apache.org> wrote: Hi Divye Ithink we are mostly on the same page. Just to clarify/rephrase: One thingto think about - on EOF “trigger immediately” will mean that theasynchronous wait timeout timers will also fire - which is undesirable Ididn't mean to fire all timers immediately in all of the built-inoperators. Just that each built-in operator can have a hard coded way(without a way for users to change it) to handle those timers. Windowedoperators would trigger the lingering timers (flush outputs),AsyncWaitOperator could just ignore them. The same way users could registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we(as flink developers) could use the same mechanism to implement anybehaviour we want for the built-in operators. There should be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 DivyeKapoor <dk...@pinterest.com.invalid> <dk...@pinterest.com.invalid> <mailto:dkapoor@pinterest.com.invalid > <dk...@pinterest.com.invalid> napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski<pn...@apache.org> <pn...@apache.org> <mailto:pnowojski@apache.org > <pn...@apache.org> wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the flexibility ofhow to deal with the timers in the low level API via adding a handler - ifsomeone needs to customize it, he will always have a workaround. Note aftergiving it more thought, I agree that registering some handlers is betterthan overloading the register timer method and modifying the timer's state.+1. At the same time, we can force the most sensible semantic that we thinkfor the couple of built-in operators, which should be prettystraightforward (either ignore the timers, or fire them at once). I agreethere might be some edge cases, that theoretically user might want to waitfor the timer to fire naturally, but: 1. I'm not sure how common inpractice this will be. If not at all, then why should we be complicatingthe API/system? That’s fair. However, the specifics are very importanthere. One thing to think about - on EOF “trigger immediately” will meanthat the asynchronous wait timeout timers will also fire - which isundesirable (because they are racing with the last async call). However,the issue is cleanly resolved by waiting for the timer to be canceled whenthe last event is processed. (“Wait for” case). Ignoring the timer has theleast justification. Registering the handler as per Dawid’s proposal andhaving that handler unregister the timers on EOF makes best sense. Thissolution also unifies the trigger immediately case as that handler canreregister the timers for early termination. The proposal: 1. Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts theregistered timers for early trigger or ignore. If wait-for behavior isdesired, timers are not changed. This is controlled in client code. 4.Operator waits for all timers to drain/trigger. (“Always”). There is nospecial handling for ignore/early trigger. 5. Operator allows job toproceed with shutdown. The only api change needed is an EOF handler. Theother agreement we need is that “Wait for” is the desired behavior inprocessing time and that processing time is fundamentally different fromevent time in this respect. (I have changed my thinking since the lastmail). 2. We can always expand the API in the future, and let the useroverride the default built-in behaviour of the operators via some setter onthe stream transformation (`SingleOutputStreamOperator`), or via somecustom API DSL style in each of the operators separately. This is notrequired. See above. Re forcing the same semantics for processing timetimers as for event time ones - this is tempting, but indeed I see apossibility that users need to adhere to some external constraints whenusing processing time. +1. As above, we should consider the 2 casesfundamentally different in this area. Re: Yun - b) Another issue is thatwhat if users use timers with different termination actions in the sameoperator / UDF? For example, users use some kind of timeout (like throwsexception if some thing not happen after some other thing), and also somekind of window aggregation logic. In this case, without additional tags,users might not be able to distinguish which timer should be canceled andwhich time should be triggered ? as above. The EOF handler makes thechoice. 4. How could these scenarios adjust their APIs ? From the currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there mightbe only one expected behavior for each scenario, thus it does not seems toneed to allow users to adjust the behavior. Thus @Divye may I have a doubleconfirmation currently do we have explicit scenarios that is expected tochange the different behaviors for the same scenario? Wait-for behavior isprobably the only expected behavior and any alterations should be from theEOF handler managing the registered timers. Besides @Divye from the listedscenarios, I have another concern for global configuration is that for onejob, different operators seems to still might have different expectedbehaviors. For example, A job using both Window operator andAsyncWaitOperator might have different requirements for timers ontermination? Thank you for raising this case. This changed my thinking.Based on your point, we should try and align on the “Wait-for” with EOFhandler proposal. I’m withdrawing the “single-runtime-config” proposal.Best, Divye
>>
>>

Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

Posted by Dawid Wysakowicz <dw...@apache.org>.
Sounds like a good plan to me.

On 08/12/2022 08:58, Yun Gao wrote:
> Hi Dawid,
>
> Very thanks for the discussion and sorry for the delayed response
> since I was hesitated on some points.
>
> But as a whole, with some more thought, first I agree with that adding
> the trigger() / cancle() methods to some kind of timer object is not 
> necessary
> for us to achieve the exactly-once for the operators. We could follow the
> direction of "modifying the implementation of the operators" to 
> achieve the
> same target.
>
> But continue to think with this direction, it now looks to me it is 
> also not
> needed to add the callback to the timer services:
> 1. For InternalTimerService, the operators could just call 
> `InternalTimerService
> #forEachProcessingTimer()` on finish to handle the pending timers.
> 2. For the timers registered to the underlying ProcessingTimerService, 
> at least in
> the currently listed scenarios, the operators itself knows what is the 
> remaining work
> (e.g., the FileWriter knows if it has in-progress file to flush).
>
> Operators could handle the remaining timers in finish() method.
>
> Then the only interface we need to consider is that added to the 
> ProcessFunction. The
> current interface also looks ok to me.
>
> If you think the above option works, I could first have a PoC that 
> demonstrate it is sufficient
> to only modify the operator implementation to handling the remaining 
> workers properly on
> finish(). If there are new issues I'll post here and we could have 
> some more discussion.
>
> Best,
> Yun Gao
>
>
>     ------------------Original Mail ------------------
>     *Sender:*Dawid Wysakowicz <dw...@apache.org>
>     *Send Date:*Fri Dec 2 21:21:25 2022
>     *Recipients:*Dev <de...@flink.apache.org>
>     *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing
>     Timers on Job Termination
>
>         Ad. 1
>
>         I'd start with ProcessingTimerService as that's the only
>         public interface. It is exposed in the Sink V2 interface. In
>         this scenario it would be the Sink interface that need to
>         extend from a EOFTimersHandler. I believe it would be hard to
>         pass it from there to the ProcessingTimeService as it is
>         passed from the outside e.g. in the
>         ProcessingTimeServiceAware. For that reason I'd go with a
>         registration method in that interface.
>
>         In ProcessFunction I'd go with a mixin approach, so a
>         ProcessFunction can extend from EOFTimersHandler. I'd do that
>         because ProcessFunction does not have an init/open method
>         where we could register the handler.
>
>         On operator level I'd have a registration method in
>         InternalTimerService. I believe that's the only way to handle
>         the above ProcessFunction aproach. E.g. in
>         KeyedProcessOperator you need to check if the UDF extend from
>         the interface not the operator itself.
>
>         Ad. 2
>
>         I'd go with
>
>         *(Keyed)ProcessFunction:*
>
>         interface EOFTimersHandler {
>
>          void handleProcessingTimer(long timestamp, Context);
>
>         }
>
>         interface Context {
>                 public abstract <X> void output(OutputTag<X>
>         outputTag, X value);
>
>                 public abstract K getCurrentKey();
>
>         // we can extend it for waitFor later
>
>         }
>
>         *ProcessingTimeService: *
>
>         interface EOFTimersHandler {
>
>          void handleProcessingTimer(long timestamp, Context);
>
>         }
>
>         interface Context {
>
>         // we can extend it for waitFor later
>
>         }
>
>         *InternalTimeService:*
>
>         interface EOFTimersHandler {
>
>          void handleProcessingTimer(InternalTimer<K,N> timer Context);
>
>         }
>
>         interface Context {
>
>         // we can extend it for waitFor later
>
>         }
>
>         Personally I'd not try to unify those places too much. They
>         have also different visibilities (public/internal), have
>         access to different set of metadata (key/namespace).
>
>
>         Ad 3.
>
>         I don't like the having the trigger/cancel methods, because:
>
>         1. I don't like the back and forth between system and UDF
>
>         2. Yes, the biggest issue I have is with the possibility with
>         registering new timers. I am trying to be on the safe side
>         here. I don't like the idea of dropping them, because it is
>         again making assumptions what users do with those timers. What
>         if they e.g. emit counter if it reached certain threshold?
>         We'd need an additional flag in the method that is the final
>         timer. My sentiment is that we're making it questionably
>         easier to trigger a timer for the cost of openning up for
>         unforeseen problems with follow up registration.
>
>         Best,
>
>         Dawid
>
>         On 30/11/2022 12:13, Yun Gao wrote:
>
>             Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are already consistent with the callback option, and I don't think I opposed that we could modify the current internal implementation. But from my side it is still not clear what the actual interfaces are proposing. Let me first try to summarize that a bit:1) Which object does the handlers register on?It seems there are two options, one is to timer services (InternalTimerService/ ProcessingTimerService or some equivalent things after refactoring), the otherone is as a lifecycle of the operator. I'm now tending to the latter one, how do you think on this part?2) What is the interface of the handler?Option 1 is that interface SomeHandlerName { void processingTimer(Timer timer);}class Timer { long getTimestamp(); void trigger(); void cancel(); // Other actions if required. }But it seems there is controversy on whether to add actions to the timer class. If without that, with my understanding the interfaces of the Option 2 areinterface SomeHandlerName { void processTimer(Timer timer); }interface KeyedSomeHandlerName<KEY, NAMESPACE> { void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, if we could eliminate the logic of namespace, we could thenremove the namespace related type parameter and method from the interfaces.Do I understand right?Besides, I'm still fully got the reason that why we should not add the actions to the timer class, in consideration that it seems in most cases users could implement their logical with simply calling timer.trigger() (I think the repeat registration is indeed a problem, but I think we could ignore the timers registered during termination). Could you further enlighten me a bit on this part?Best,Yun Gao------------------------------------------------------------------From:Piotr Nowojski<pn...@apache.org>Send Time:2022 Nov. 30 (Wed.) 17:10To:dev<de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job TerminationHi,I have a couple of remarks.First a general one. For me the important part in the design of this API ishow to expose this to Flink users in public interfaces. NamelyProcessFunction and StreamOperator. InternalTimerService is an internalclass, so we can change it and break it as needed in the future.For registering a handler like proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context ctx ) { }}makes sense to me. For the InternalTimerService I think it doesn't mattertoo much what we do. We could provide a similar interface as for theProcessFunction/StreamOperator, it doesn't have to be the same one. On thecontrary, I think it shouldn't be the same, as part of this effort weshouldn't be exposing the concept of `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see arguments why users might want to usethis, but I'm also not convinced whether that's necessary in practice. Iwould be +1 either way. First version can be without this functionality andwe can add it later (given that we designed a good place to add it in thefuture, like the `Context` proposed by Dawid). But I'm also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz<dw...@apache.org>napisał(a):
>
>                 WindowOperator is not implemented by users. I can see that forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}I don't see a problem with that.As you said ProcessingTimeService is a user facing interface andcompletely unrelated to the InternalTimerService. I don't see a reasonwhy we'd need to unify those.As for the waitFor behaviour. Personally, I have not been convinced itis necessary. Maybe it's just my lack of vision, but I can't think of ascenario where I'd use it. Still if we need it, I'd go for something like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you have independent simple interfaces that need to work onlyin a single well defined scenario and you don't need to match aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote:
>
>                     Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I also prefer to use simplebut flexible interfaces, but it still looks there are some problem tojust let users to implement the termination actions.Let's take the WindowOperator as an example. As seen in [1],in the timer processing logic it needs to acquire the key / namespaceinformation bound to the timer (which is only supported by the
>
>                 InternalTimerService).
>
>                     Thus if we want users to implement the same logic on termination, we
>
>                 either let users
>
>                     to trigger the timer handler directly or we also allows users to access
>
>                 these piece of
>
>                     information. If we go with the later direction, we might need to provide
>
>                 interfaces like
>
>                     interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the issue that since we need the interface to handle
>
>                 both of cases of
>
>                     InternalTimerSerivce and raw ProcessTimeService, the later do not have
>
>                 key and
>
>                     namespace information attached, and its also be a bit inconsistency for
>
>                 users to have to set
>
>                     the KEY and NAMESPACE types.Besides, it looks to me that if we want to implement behaviors like
>
>                 waiting for, it might
>
>                     be not simply reuse the time handler time, then it requires every
>
>                 operator authors to
>
>                     re-implement such waiting logics.
>
>                         Moreover it still have the downside that if you call back to the
>
>                 `onTimer` method after
>
>                         `trigger` you have access to the Context which lets you register new
>
>                 timers.
>
>                     I think we could simply drop the timers registered during we start
>
>                 processing the pending timers
>
>                     on termination. Logically there should be no new data after termination.
>
>                         I think I am not convinced to these arguments. First of all I'm afraid
>
>                 there is no clear distinction
>
>                         in that area what is runtime and what is not. I always found
>
>                 `AbstracStreamOperator(*)` actually part
>
>                         of runtime or Flink's internals and thus I don't find
>
>                 `InternalTimerService` a utility, but a vital part
>
>                         of the system. Let's be honest it is impossible to implement an
>
>                 operator without extending from
>
>                         `AbstractStreamOperator*`.What would be the problem with having a
>
>                 proper implementation in
>
>                         `InternalTimerService`? Can't we do it like this?:
>
>                     I think the original paragraph is only explanation to that the interface
>
>                 is harder to support if we
>
>                     allows the users to implement the arbitrary logic. But since now we are
>
>                 at the page with the callback
>
>                     option, users could always be allowed to implement arbitrary logic no
>
>                 matter we support timer.trigger()
>
>                     or not, thus I think now there is no divergence on this point. I also
>
>                 believe in we'll finally have some logic
>
>                     similar to the proposed one that drain all the times and process it.Best,Yun Gao[1]
>
>                 https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488  <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>                 ><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488  <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
>                 >
>
>                     ------------------------------------------------------------------From:Dawid Wysakowicz<dw...@apache.org>Send Time:2022 Nov. 28 (Mon.) 23:33To:dev<de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
>
>                 on Job Termination
>
>                     Do we really need to have separate methods for
>
>                 triggering/waiting/cancelling. To me it sounds rather counterintuitive. Whycan't users just execute whatever they want in the handler itself insteadof additional back and forth with the system? Moreover it still have thedownside that if you call back to the `onTimer` method after `trigger` youhave access to the Context which lets you register new timers.
>
>                     I find following approach much simpler:void onTimer(...) {doHandleTimer(timestamp);}void processPendingTimer(...) {// triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I might not make it very clear here. I think the difficulty with
>
>                 supported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination.
>
>                     I think I am not convinced to these arguments. First of all I'm afraid
>
>                 there is no clear distinction in that area what is runtime and what is not.I always found `AbstracStreamOperator(*)` actually part of runtime orFlink's internals and thus I don't find `InternalTimerService` a utility,but a vital part of the system. Let's be honest it is impossible toimplement an operator without extending from `AbstractStreamOperator*`.
>
>                     What would be the problem with having a proper implementation in
>
>                 `InternalTimerService`? Can't we do it like this?:
>
>                     AbstractStreamOperator#finish() {internalTimerService.finish();}InternalTimerService#finish() {while ((timer = processingTimeTimersQueue.peek()) != null) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If we only executes some predefined actions, we do not need to worry
>
>                 about the implementation of InternalTimerService and just execute theregistered timers. But if we allow users to execute arbitrary logic, weneed to be also aware of the InternalTimerServices and parse the key fromthe timers stored in it. I think we should always have method to overcomethis issue, but to support the callback options would be more complex.
>
>                     I am not sure, having "predefined actions" would be good enough that we
>
>                 do not need to set a key. As a user I'd anyhow expect the proper key to beset in processPendingTimer.
>
>                     Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we
>
>                 have reached the consensus on the high-level API: Most operators shouldusually have only one reasonable action to the pending timers ontermination, thus we could let the operators to implement its own actionswith the low-level interface provided. The only exception is theProcessFunction, with which users might register customized timers, thususers might also defines the actions on termination (If I havemisunderstandings here, please correct me). For the low-level API, I couldget the benefits with the callback options: since in most cases an operatorhas only one action to all the timers, its a waste for us to store the sameflag for all the timers, also with a lot of code / state format changes.But since it is enough for most users to simply trigger / cacnel thetimers, it would be redundant for users to implement the logic twice. Thusperhaps we might combine the benefits of the two options: We might have aseparate interface public interface TimerHandlersOnTermination { voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); voidcancel(); } Then if an operator have implemented theTimerHandlersOnTermination interface, on termination we could callprocessPendingTimer(xx) for every pending timers. Users might simplytrigger / waitFor / cancel it, or execute some other logics if needed. Thenfor the ProcessFunction we might have a similar interface toprocessPendingTimer, except we might need to provide Context / Collector tothe ProcessFunction. Do you think this would be a good direction? Also@Piotr I don't see a problem here. Interface doesn't have to reflect that,only the runtime must set the correct key context before executing thehandler dealing with the processing time timers at the end of input/time.Sorry I might not make it very clear here. I think the difficulty withsupported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination. If we only executes some predefinedactions, we do not need to worry about the implementation ofInternalTimerService and just execute the registered timers. But if weallow users to execute arbitrary logic, we need to be also aware of theInternalTimerServices and parse the key from the timers stored in it. Ithink we should always have method to overcome this issue, but to supportthe callback options would be more complex. Best, Yun Gao------------------------------------------------------------------From:Divye Kapoor<dk...@pinterest.com.INVALID>  <mailto:dkapoor@pinterest.com.INVALID >  Send Time:2022 Nov. 24 (Thu.) 08:50To:dev<de...@flink.apache.org>  <mailto:dev@flink.apache.org >  Cc:XenonDevelopment Team<xe...@pinterest.com>  <mailto:xenon-dev@pinterest.com
>
>                     Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
>
>                 Timers on Job Termination Sounds good. Looks like we're on the same page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <pn...@apache.org> <mailto:pnowojski@apache.org >  wrote: Hi Divye Ithink we are mostly on the same page. Just to clarify/rephrase: One thingto think about - on EOF “trigger immediately” will mean that theasynchronous wait timeout timers will also fire - which is undesirable Ididn't mean to fire all timers immediately in all of the built-inoperators. Just that each built-in operator can have a hard coded way(without a way for users to change it) to handle those timers. Windowedoperators would trigger the lingering timers (flush outputs),AsyncWaitOperator could just ignore them. The same way users could registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we(as flink developers) could use the same mechanism to implement anybehaviour we want for the built-in operators. There should be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 DivyeKapoor<dk...@pinterest.com.invalid>  <mailto:dkapoor@pinterest.com.invalid >  napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski<pn...@apache.org>  <mailto:pnowojski@apache.org >  wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the flexibility ofhow to deal with the timers in the low level API via adding a handler - ifsomeone needs to customize it, he will always have a workaround. Note aftergiving it more thought, I agree that registering some handlers is betterthan overloading the register timer method and modifying the timer's state.+1. At the same time, we can force the most sensible semantic that we thinkfor the couple of built-in operators, which should be prettystraightforward (either ignore the timers, or fire them at once). I agreethere might be some edge cases, that theoretically user might want to waitfor the timer to fire naturally, but: 1. I'm not sure how common inpractice this will be. If not at all, then why should we be complicatingthe API/system? That’s fair. However, the specifics are very importanthere. One thing to think about - on EOF “trigger immediately” will meanthat the asynchronous wait timeout timers will also fire - which isundesirable (because they are racing with the last async call). However,the issue is cleanly resolved by waiting for the timer to be canceled whenthe last event is processed. (“Wait for” case). Ignoring the timer has theleast justification. Registering the handler as per Dawid’s proposal andhaving that handler unregister the timers on EOF makes best sense. Thissolution also unifies the trigger immediately case as that handler canreregister the timers for early termination. The proposal: 1. Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts theregistered timers for early trigger or ignore. If wait-for behavior isdesired, timers are not changed. This is controlled in client code. 4.Operator waits for all timers to drain/trigger. (“Always”). There is nospecial handling for ignore/early trigger. 5. Operator allows job toproceed with shutdown. The only api change needed is an EOF handler. Theother agreement we need is that “Wait for” is the desired behavior inprocessing time and that processing time is fundamentally different fromevent time in this respect. (I have changed my thinking since the lastmail). 2. We can always expand the API in the future, and let the useroverride the default built-in behaviour of the operators via some setter onthe stream transformation (`SingleOutputStreamOperator`), or via somecustom API DSL style in each of the operators separately. This is notrequired. See above. Re forcing the same semantics for processing timetimers as for event time ones - this is tempting, but indeed I see apossibility that users need to adhere to some external constraints whenusing processing time. +1. As above, we should consider the 2 casesfundamentally different in this area. Re: Yun - b) Another issue is thatwhat if users use timers with different termination actions in the sameoperator / UDF? For example, users use some kind of timeout (like throwsexception if some thing not happen after some other thing), and also somekind of window aggregation logic. In this case, without additional tags,users might not be able to distinguish which timer should be canceled andwhich time should be triggered ? as above. The EOF handler makes thechoice. 4. How could these scenarios adjust their APIs ? From the currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there mightbe only one expected behavior for each scenario, thus it does not seems toneed to allow users to adjust the behavior. Thus @Divye may I have a doubleconfirmation currently do we have explicit scenarios that is expected tochange the different behaviors for the same scenario? Wait-for behavior isprobably the only expected behavior and any alterations should be from theEOF handler managing the registered timers. Besides @Divye from the listedscenarios, I have another concern for global configuration is that for onejob, different operators seems to still might have different expectedbehaviors. For example, A job using both Window operator andAsyncWaitOperator might have different requirements for timers ontermination? Thank you for raising this case. This changed my thinking.Based on your point, we should try and align on the “Wait-for” with EOFhandler proposal. I’m withdrawing the “single-runtime-config” proposal.Best, Divye
>

Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

Posted by Yun Gao <yu...@aliyun.com.INVALID>.
Hi Dawid,

Very thanks for the discussion and sorry for the delayed response
since I was hesitated on some points. 

But as a whole, with some more thought, first I agree with that adding 
the trigger() / cancle() methods to some kind of timer object is not necessary
for us to achieve the exactly-once for the operators. We could follow the 
direction of "modifying the implementation of the operators" to achieve the
same target. 

But continue to think with this direction, it now looks to me it is also not
needed to add the callback to the timer services:
1. For InternalTimerService, the operators could just call `InternalTimerService
#forEachProcessingTimer()` on finish to handle the pending timers. 
2. For the timers registered to the underlying ProcessingTimerService, at least in
the currently listed scenarios, the operators itself knows what is the remaining work
(e.g., the FileWriter knows if it has in-progress file to flush). 

Operators could handle the remaining timers in finish() method. 

Then the only interface we need to consider is that added to the ProcessFunction. The
current interface also looks ok to me.

If you think the above option works, I could first have a PoC that demonstrate it is sufficient
to only modify the operator implementation to handling the remaining workers properly on
finish(). If there are new issues I'll post here and we could have some more discussion. 

Best,
Yun Gao



 ------------------Original Mail ------------------
Sender:Dawid Wysakowicz <dw...@apache.org>
Send Date:Fri Dec 2 21:21:25 2022
Recipients:Dev <de...@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination
 
Ad. 1
I'd start with ProcessingTimerService as that's the only public interface. It is exposed in the Sink V2 interface. In this scenario it would be the Sink interface that need to extend from a EOFTimersHandler. I believe it would be hard to pass it from there to the ProcessingTimeService as it is passed from the outside e.g. in the ProcessingTimeServiceAware. For that reason I'd go with a registration method in that interface.
In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can extend from EOFTimersHandler. I'd do that because ProcessFunction does not have an init/open method where we could register the handler.
On operator level I'd have a registration method in InternalTimerService. I believe that's the only way to handle the above ProcessFunction aproach. E.g. in KeyedProcessOperator you need to check if the UDF extend from the interface not the operator itself.
Ad. 2
I'd go with
(Keyed)ProcessFunction:
interface EOFTimersHandler {
 void handleProcessingTimer(long timestamp, Context);
}
interface Context {
         public abstract <X> void output(OutputTag<X> outputTag, X value);

         public abstract K getCurrentKey();
// we can extend it for waitFor later
}
ProcessingTimeService: 
interface EOFTimersHandler {
 void handleProcessingTimer(long timestamp, Context);
}
interface Context {
// we can extend it for waitFor later
}
InternalTimeService:
interface EOFTimersHandler {
 void handleProcessingTimer(InternalTimer<K,N> timer Context);
}
interface Context {
// we can extend it for waitFor later
}
Personally I'd not try to unify those places too much. They have also different visibilities (public/internal), have access to different set of metadata (key/namespace).

Ad 3.
I don't like the having the trigger/cancel methods, because:
1. I don't like the back and forth between system and UDF
2. Yes, the biggest issue I have is with the possibility with registering new timers. I am trying to be on the safe side here. I don't like the idea of dropping them, because it is again making assumptions what users do with those timers. What if they e.g. emit counter if it reached certain threshold? We'd need an additional flag in the method that is the final timer. My sentiment is that we're making it questionably easier to trigger a timer for the cost of openning up for unforeseen problems with follow up registration.
Best,
Dawid
On 30/11/2022 12:13, Yun Gao wrote:

Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are already consistent with the callback option, and I don't think I opposed that we could modify the current internal implementation. But from my side it is still not clear what the actual interfaces are proposing. Let me first try to summarize that a bit:1) Which object does the handlers register on?It seems there are two options, one is to timer services (InternalTimerService/ ProcessingTimerService or some equivalent things after refactoring), the otherone is as a lifecycle of the operator. I'm now tending to the latter one, how do you think on this part?2) What is the interface of the handler?Option 1 is that interface SomeHandlerName { void processingTimer(Timer timer);}class Timer { long getTimestamp(); void trigger(); void cancel(); // Other actions if required. }But it seems there is controversy on whether to add actions to the timer class. If without that, with my understanding the interfaces of the Option 2 areinterface SomeHandlerName { void processTimer(Timer timer); }interface KeyedSomeHandlerName<KEY, NAMESPACE> { void processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, if we could eliminate the logic of namespace, we could thenremove the namespace related type parameter and method from the interfaces.Do I understand right?Besides, I'm still fully got the reason that why we should not add the actions to the timer class, in consideration that it seems in most cases users could implement their logical with simply calling timer.trigger() (I think the repeat registration is indeed a problem, but I think we could ignore the timers registered during termination). Could you further enlighten me a bit on this part?Best,Yun Gao------------------------------------------------------------------From:Piotr Nowojski <pn...@apache.org>Send Time:2022 Nov. 30 (Wed.) 17:10To:dev <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job TerminationHi,I have a couple of remarks.First a general one. For me the important part in the design of this API ishow to expose this to Flink users in public interfaces. NamelyProcessFunction and StreamOperator. InternalTimerService is an internalclass, so we can change it and break it as needed in the future.For registering a handler like proposed by Dawid:interface SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context ctx ) { }}makes sense to me. For the InternalTimerService I think it doesn't mattertoo much what we do. We could provide a similar interface as for theProcessFunction/StreamOperator, it doesn't have to be the same one. On thecontrary, I think it shouldn't be the same, as part of this effort weshouldn't be exposing the concept of `Namespaces` to the public facing API.Re the "waitFor". Theoretically I see arguments why users might want to usethis, but I'm also not convinced whether that's necessary in practice. Iwould be +1 either way. First version can be without this functionality andwe can add it later (given that we designed a good place to add it in thefuture, like the `Context` proposed by Dawid). But I'm also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis 2022 o 09:18 Dawid Wysakowicz <dw...@apache.org>napisał(a): 
WindowOperator is not implemented by users. I can see that forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}I don't see a problem with that.As you said ProcessingTimeService is a user facing interface andcompletely unrelated to the InternalTimerService. I don't see a reasonwhy we'd need to unify those.As for the waitFor behaviour. Personally, I have not been convinced itis necessary. Maybe it's just my lack of vision, but I can't think of ascenario where I'd use it. Still if we need it, I'd go for something like:void onTimer(/* whatever type it is */ timer, Context ctx ) {}interface Context {void executeAtScheduledTime(Consumer<timer> handler);}That way you have independent simple interfaces that need to work onlyin a single well defined scenario and you don't need to match aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun Gao wrote: 
Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I also prefer to use simplebut flexible interfaces, but it still looks there are some problem tojust let users to implement the termination actions.Let's take the WindowOperator as an example. As seen in [1],in the timer processing logic it needs to acquire the key / namespaceinformation bound to the timer (which is only supported by the 
InternalTimerService). 
Thus if we want users to implement the same logic on termination, we 
either let users 
to trigger the timer handler directly or we also allows users to access 
these piece of 
information. If we go with the later direction, we might need to provide 
interfaces like 
interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll have the issue that since we need the interface to handle 
both of cases of 
InternalTimerSerivce and raw ProcessTimeService, the later do not have 
key and 
namespace information attached, and its also be a bit inconsistency for 
users to have to set 
the KEY and NAMESPACE types.Besides, it looks to me that if we want to implement behaviors like 
waiting for, it might 
be not simply reuse the time handler time, then it requires every 
operator authors to 
re-implement such waiting logics. 
Moreover it still have the downside that if you call back to the  
`onTimer` method after  
`trigger` you have access to the Context which lets you register new  
timers. 
I think we could simply drop the timers registered during we start 
processing the pending timers 
on termination. Logically there should be no new data after termination. 
I think I am not convinced to these arguments. First of all I'm afraid  
there is no clear distinction  
in that area what is runtime and what is not. I always found  
`AbstracStreamOperator(*)` actually part  
of runtime or Flink's internals and thus I don't find  
`InternalTimerService` a utility, but a vital part  
of the system. Let's be honest it is impossible to implement an  
operator without extending from  
`AbstractStreamOperator*`.What would be the problem with having a  
proper implementation in  
`InternalTimerService`? Can't we do it like this?: 
I think the original paragraph is only explanation to that the interface 
is harder to support if we 
allows the users to implement the arbitrary logic. But since now we are 
at the page with the callback 
option, users could always be allowed to implement arbitrary logic no 
matter we support timer.trigger() 
or not, thus I think now there is no divergence on this point. I also 
believe in we'll finally have some logic 
similar to the proposed one that drain all the times and process it.Best,Yun Gao[1] 
https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 ><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488 > 
------------------------------------------------------------------From:Dawid Wysakowicz <dw...@apache.org>Send Time:2022 Nov. 28 (Mon.) 23:33To:dev <de...@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers 
on Job Termination 
Do we really need to have separate methods for 
triggering/waiting/cancelling. To me it sounds rather counterintuitive. Whycan't users just execute whatever they want in the handler itself insteadof additional back and forth with the system? Moreover it still have thedownside that if you call back to the `onTimer` method after `trigger` youhave access to the Context which lets you register new timers. 
I find following approach much simpler:void onTimer(...) {doHandleTimer(timestamp);}void processPendingTimer(...) {// triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry I might not make it very clear here. I think the difficulty with 
supported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination. 
I think I am not convinced to these arguments. First of all I'm afraid 
there is no clear distinction in that area what is runtime and what is not.I always found `AbstracStreamOperator(*)` actually part of runtime orFlink's internals and thus I don't find `InternalTimerService` a utility,but a vital part of the system. Let's be honest it is impossible toimplement an operator without extending from `AbstractStreamOperator*`. 
What would be the problem with having a proper implementation in 
`InternalTimerService`? Can't we do it like this?: 
AbstractStreamOperator#finish() {internalTimerService.finish();}InternalTimerService#finish() {while ((timer = processingTimeTimersQueue.peek()) != null) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If we only executes some predefined actions, we do not need to worry 
about the implementation of InternalTimerService and just execute theregistered timers. But if we allow users to execute arbitrary logic, weneed to be also aware of the InternalTimerServices and parse the key fromthe timers stored in it. I think we should always have method to overcomethis issue, but to support the callback options would be more complex. 
I am not sure, having "predefined actions" would be good enough that we 
do not need to set a key. As a user I'd anyhow expect the proper key to beset in processPendingTimer. 
Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks for the discussion! First IMO it seems we 
have reached the consensus on the high-level API: Most operators shouldusually have only one reasonable action to the pending timers ontermination, thus we could let the operators to implement its own actionswith the low-level interface provided. The only exception is theProcessFunction, with which users might register customized timers, thususers might also defines the actions on termination (If I havemisunderstandings here, please correct me). For the low-level API, I couldget the benefits with the callback options: since in most cases an operatorhas only one action to all the timers, its a waste for us to store the sameflag for all the timers, also with a lot of code / state format changes.But since it is enough for most users to simply trigger / cacnel thetimers, it would be redundant for users to implement the logic twice. Thusperhaps we might combine the benefits of the two options: We might have aseparate interface public interface TimerHandlersOnTermination { voidprocessPendingTimer(Timer timer, long currentTime); } public class Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); voidcancel(); } Then if an operator have implemented theTimerHandlersOnTermination interface, on termination we could callprocessPendingTimer(xx) for every pending timers. Users might simplytrigger / waitFor / cancel it, or execute some other logics if needed. Thenfor the ProcessFunction we might have a similar interface toprocessPendingTimer, except we might need to provide Context / Collector tothe ProcessFunction. Do you think this would be a good direction? Also@Piotr I don't see a problem here. Interface doesn't have to reflect that,only the runtime must set the correct key context before executing thehandler dealing with the processing time timers at the end of input/time.Sorry I might not make it very clear here. I think the difficulty withsupported setting the currentKey is a special issue for the callbackoptions (no matter what the interface is) since it allows users to executelogic other than the one registered with the timers. The complexity comesfrom that currently we have two level of TimerServices: TheProcessingTimerService (there is no key) and InternalTimerService (withkey). Currently only ProcessingTimerService is exposed to the runtime andInternalTimerService is much more a utility to implement the operator. Thenwith the current code, the runtime could only access toProcessingTimerService on termination. If we only executes some predefinedactions, we do not need to worry about the implementation ofInternalTimerService and just execute the registered timers. But if weallow users to execute arbitrary logic, we need to be also aware of theInternalTimerServices and parse the key from the timers stored in it. Ithink we should always have method to overcome this issue, but to supportthe callback options would be more complex. Best, Yun Gao------------------------------------------------------------------From:Divye Kapoor <dk...@pinterest.com.INVALID> <mailto:dkapoor@pinterest.com.INVALID > Send Time:2022 Nov. 24 (Thu.) 08:50To:dev <de...@flink.apache.org> <mailto:dev@flink.apache.org > Cc:XenonDevelopment Team <xe...@pinterest.com> <mailto:xenon-dev@pinterest.com 
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing 
Timers on Job Termination Sounds good. Looks like we're on the same page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski <pn...@apache.org> <mailto:pnowojski@apache.org > wrote: Hi Divye Ithink we are mostly on the same page. Just to clarify/rephrase: One thingto think about - on EOF “trigger immediately” will mean that theasynchronous wait timeout timers will also fire - which is undesirable Ididn't mean to fire all timers immediately in all of the built-inoperators. Just that each built-in operator can have a hard coded way(without a way for users to change it) to handle those timers. Windowedoperators would trigger the lingering timers (flush outputs),AsyncWaitOperator could just ignore them. The same way users could registerEOF timer handlers in the ProcessFunction as Dawid Wysakowicz proposed, we(as flink developers) could use the same mechanism to implement anybehaviour we want for the built-in operators. There should be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 08:21 DivyeKapoor <dk...@pinterest.com.invalid> <mailto:dkapoor@pinterest.com.invalid > napisał(a): Thanks Yun/Piotrek, Somebrief comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr Nowojski<pn...@apache.org> <mailto:pnowojski@apache.org > wrote: Hi, All inall I would agree with Dawid's proposal. +1 We can add the flexibility ofhow to deal with the timers in the low level API via adding a handler - ifsomeone needs to customize it, he will always have a workaround. Note aftergiving it more thought, I agree that registering some handlers is betterthan overloading the register timer method and modifying the timer's state.+1. At the same time, we can force the most sensible semantic that we thinkfor the couple of built-in operators, which should be prettystraightforward (either ignore the timers, or fire them at once). I agreethere might be some edge cases, that theoretically user might want to waitfor the timer to fire naturally, but: 1. I'm not sure how common inpractice this will be. If not at all, then why should we be complicatingthe API/system? That’s fair. However, the specifics are very importanthere. One thing to think about - on EOF “trigger immediately” will meanthat the asynchronous wait timeout timers will also fire - which isundesirable (because they are racing with the last async call). However,the issue is cleanly resolved by waiting for the timer to be canceled whenthe last event is processed. (“Wait for” case). Ignoring the timer has theleast justification. Registering the handler as per Dawid’s proposal andhaving that handler unregister the timers on EOF makes best sense. Thissolution also unifies the trigger immediately case as that handler canreregister the timers for early termination. The proposal: 1. Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts theregistered timers for early trigger or ignore. If wait-for behavior isdesired, timers are not changed. This is controlled in client code. 4.Operator waits for all timers to drain/trigger. (“Always”). There is nospecial handling for ignore/early trigger. 5. Operator allows job toproceed with shutdown. The only api change needed is an EOF handler. Theother agreement we need is that “Wait for” is the desired behavior inprocessing time and that processing time is fundamentally different fromevent time in this respect. (I have changed my thinking since the lastmail). 2. We can always expand the API in the future, and let the useroverride the default built-in behaviour of the operators via some setter onthe stream transformation (`SingleOutputStreamOperator`), or via somecustom API DSL style in each of the operators separately. This is notrequired. See above. Re forcing the same semantics for processing timetimers as for event time ones - this is tempting, but indeed I see apossibility that users need to adhere to some external constraints whenusing processing time. +1. As above, we should consider the 2 casesfundamentally different in this area. Re: Yun - b) Another issue is thatwhat if users use timers with different termination actions in the sameoperator / UDF? For example, users use some kind of timeout (like throwsexception if some thing not happen after some other thing), and also somekind of window aggregation logic. In this case, without additional tags,users might not be able to distinguish which timer should be canceled andwhich time should be triggered ? as above. The EOF handler makes thechoice. 4. How could these scenarios adjust their APIs ? From the currentlisted scenarios, I'm more tend to that as @Dawid pointed out, there mightbe only one expected behavior for each scenario, thus it does not seems toneed to allow users to adjust the behavior. Thus @Divye may I have a doubleconfirmation currently do we have explicit scenarios that is expected tochange the different behaviors for the same scenario? Wait-for behavior isprobably the only expected behavior and any alterations should be from theEOF handler managing the registered timers. Besides @Divye from the listedscenarios, I have another concern for global configuration is that for onejob, different operators seems to still might have different expectedbehaviors. For example, A job using both Window operator andAsyncWaitOperator might have different requirements for timers ontermination? Thank you for raising this case. This changed my thinking.Based on your point, we should try and align on the “Wait-for” with EOFhandler proposal. I’m withdrawing the “single-runtime-config” proposal.Best, Divye