You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2017/03/17 14:14:08 UTC

ProcessingTimeTimer in ProcessFunction after a savepoint

Hi all,

How does the processing time timer behave when a job is taken down with a
savepoint and then restarted after the timer was supposed to fire? Will the
timer fire at restart because it was missed during the savepoint?

I'm wondering because I would like to schedule periodic timers in the
future (in processing time) at which a state is read and emitted, but I'm
afraid the timer will never fire if it occurs when the job is being down,
and therefore the state will never be emitted.

Best,
Yassine

Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks, please do keep me posted!

> On 20 Mar 2017, at 21:50, Florian König <fl...@micardo.com> wrote:
> 
> @Aljoscha Thank you for the pointer to ProcessFunction. That looks like a better approach with less code and other overhead.
> 
> After restoring, the job is both reading new elements and emitting some, but nowhere near as many as expected. I’ll investigate further after switching to ProcessFunction. I suspect that there is some problem with my code. I’ll let you know if any unexplained discrepancy remains.
> 
>> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> As a general remark, I think the ProcessFunction (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html) could be better suited for implementing such a use case.
>> 
>> I did run tests on Flink 1.2 and master with a simple processing-time windowing job. After performing a savepoint and waiting a few minutes I restored and the windows that were still there immediately fired.
>> 
>> In your case, after restoring, is the Job also reading new elements or did you try with just restoring without any new input?
>> 
>>> On 19 Mar 2017, at 13:15, Florian König <fl...@micardo.com> wrote:
>>> 
>>> @Aljoscha: We’re using 1.2.
>>> 
>>> The intention of our code is as follows: The events that flow through Flink represent scheduling decisions, i.e. they contain the ID of a target entity, a description of an action that should be performed on that entity by some other job, and a timestamp of when that should happen.
>>> 
>>> We’re using the windowing mechanism to delay those events until they should be forwarded (and trigger the corresponding action). Furthermore, the schedule can be moved closer to the current point in time: subsequent schedule events for an entity (identified by its ID) can set the trigger time to an earlier instant. If the trigger time is in the past or very shortly (e.g., 100 ms) after now, the action should be triggered immediately. Actions scheduled for an instant after the currently planned one should be ignored; i.e. the schedule cannot be moved to the future.
>>> 
>>> exemplary event stream
>>> 	time … (ID, action, trigger time)	// intended reaction
>>> 	0 … (1, 'foo', 10)		// trigger action 'foo' on entity 1 at time 10
>>> 	3 … (2, 'bar', 15)		// trigger action 'bar' on entity 2 at time 15
>>> 	4 … (1, 'foo', 7)		// move trigger back to time 7
>>> 	9 … (1, 'foo', 12)		// ignore
>>> 	15 … (2, 'bar', 15)		// trigger immediately
>>> 
>>> resulting stream:
>>> 	(1, 'foo', 7)		// at time 7
>>> 	(2, 'bar', 15)		// at time 15
>>> 
>>> To implement this, we have written a custom trigger that’s called from the following Flink code:
>>> 
>>> …
>>> schedules.keyBy(schedule -> schedule.entityId)
>>> 		.window(GlobalWindows.create())
>>> 		.trigger(DynamicEarliestWindowTrigger.create())
>>> 		.fold((Schedule) null, (folded, schedule) -> schedule)
>>> 		.map( /* process schedules */ )
>>> …
>>> 
>>> We fold the scheduling events 'to themselves', because only the latest event in each period is relevant. The custom trigger is implemented as follows (only Flink-revelvant parts and syntax):
>>> 
>>> class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window> extends Trigger<T, W> {
>>> 	
>>> 	ValueStateDescriptor<Long> windowEnd = new ValueStateDescriptor<>("windowEnd", Long.class);
>>> 	
>>> 	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>>> 		val windowEndState = ctx.getPartitionedState(windowEnd);
>>> 		val windowEndsAt = windowEndState.value();
>>> 		val newEnd = element.getTimestamp();
>>> 		
>>> 		// no timer set yet, or intention to trigger earlier
>>> 		if (windowEndsAt == null || newEnd <= windowEndsAt) {
>>> 			deleteCurrentTimer(ctx);
>>> 			
>>> 			// trigger time far enough from now => schedule timer
>>> 			if (newEnd > System.currentTimeMillis() + 100) {
>>> 				ctx.registerProcessingTimeTimer(newEnd);
>>> 				windowEndState.update(newEnd);
>>> 			} else {
>>> 				return TriggerResult.FIRE;	// close enough => fire immediately
>>> 			}
>>> 		}
>>> 		
>>> 		// ignore events that should be triggered in the future
>>> 		return TriggerResult.CONTINUE;
>>> 	}
>>> 
>>> 	// fire when timer has reached pre-set time
>>> 	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
>>> 		return TriggerResult.FIRE_AND_PURGE;
>>> 	}
>>> 
>>> 	// noop
>>> 	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
>>> 		return TriggerResult.CONTINUE;
>>> 	}
>>> 	
>>> 	void clear(W window, TriggerContext ctx) throws Exception {
>>> 		deleteCurrentTimer(ctx);
>>> 	}
>>> 	
>>> 	void deleteCurrentTimer(TriggerContext ctx) throws Exception {
>>> 		val windowEndState = ctx.getPartitionedState(windowEnd);
>>> 		val windowEndsAt = windowEndState.value();
>>> 
>>> 		if (windowEndsAt != null) {
>>> 			ctx.deleteProcessingTimeTimer(windowEndsAt);
>>> 			windowEndState.clear();
>>> 		}
>>> 	}
>>> 		
>>> 	boolean canMerge() { return false; }
>>> }
>>> 
>>> The job state grows by the number of scheduled entities and the mechanism works as intended, as long as the job runs. However, due to unrelated reasons, the job sometimes fails and is restarted from a checkpoint. The state size after the restore tells me that the state has been restored.
>>> 
>>> Yet, the mechanism stops working and none of the old scheduling events that must have been ‚waiting‘ in the window for the timer to trigger are actually forwarded. Hence my question if it’s possible that timers may not be restored.
>>> 
>>> Any ideas what might have gone wrong? Is there a better way to implement such a mechanism?
>>> 
>>> Thanks and enjoy the rest of your weekend :)
>>> Florian
>>> 
>>> 
>>>> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <al...@apache.org>:
>>>> 
>>>> When restoring, processing-time timers that would have fired already should immediately fire.
>>>> 
>>>> @Florian what Flink version are you using? In Flink 1.1 there was a bug that led to processing-time timers not being reset when restoring.
>>>>> On 17 Mar 2017, at 15:39, Florian König <fl...@micardo.com> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.
>>>>> 
>>>>> Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?
>>>>> 
>>>>> Thanks
>>>>> Florian
>>>>> 
>>>>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
>>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
>>>>>> 
>>>>>> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
>>>>>> 
>>>>>> Best,
>>>>>> Yassine
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 


Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Florian König <fl...@micardo.com>.
@Aljoscha Thank you for the pointer to ProcessFunction. That looks like a better approach with less code and other overhead.

After restoring, the job is both reading new elements and emitting some, but nowhere near as many as expected. I’ll investigate further after switching to ProcessFunction. I suspect that there is some problem with my code. I’ll let you know if any unexplained discrepancy remains.

> Am 20.03.2017 um 14:15 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> As a general remark, I think the ProcessFunction (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html) could be better suited for implementing such a use case.
> 
> I did run tests on Flink 1.2 and master with a simple processing-time windowing job. After performing a savepoint and waiting a few minutes I restored and the windows that were still there immediately fired.
> 
> In your case, after restoring, is the Job also reading new elements or did you try with just restoring without any new input?
> 
>> On 19 Mar 2017, at 13:15, Florian König <fl...@micardo.com> wrote:
>> 
>> @Aljoscha: We’re using 1.2.
>> 
>> The intention of our code is as follows: The events that flow through Flink represent scheduling decisions, i.e. they contain the ID of a target entity, a description of an action that should be performed on that entity by some other job, and a timestamp of when that should happen.
>> 
>> We’re using the windowing mechanism to delay those events until they should be forwarded (and trigger the corresponding action). Furthermore, the schedule can be moved closer to the current point in time: subsequent schedule events for an entity (identified by its ID) can set the trigger time to an earlier instant. If the trigger time is in the past or very shortly (e.g., 100 ms) after now, the action should be triggered immediately. Actions scheduled for an instant after the currently planned one should be ignored; i.e. the schedule cannot be moved to the future.
>> 
>> exemplary event stream
>> 	time … (ID, action, trigger time)	// intended reaction
>> 	0 … (1, 'foo', 10)		// trigger action 'foo' on entity 1 at time 10
>> 	3 … (2, 'bar', 15)		// trigger action 'bar' on entity 2 at time 15
>> 	4 … (1, 'foo', 7)		// move trigger back to time 7
>> 	9 … (1, 'foo', 12)		// ignore
>> 	15 … (2, 'bar', 15)		// trigger immediately
>> 
>> resulting stream:
>> 	(1, 'foo', 7)		// at time 7
>> 	(2, 'bar', 15)		// at time 15
>> 
>> To implement this, we have written a custom trigger that’s called from the following Flink code:
>> 
>> …
>> schedules.keyBy(schedule -> schedule.entityId)
>> 		.window(GlobalWindows.create())
>> 		.trigger(DynamicEarliestWindowTrigger.create())
>> 		.fold((Schedule) null, (folded, schedule) -> schedule)
>> 		.map( /* process schedules */ )
>> …
>> 
>> We fold the scheduling events 'to themselves', because only the latest event in each period is relevant. The custom trigger is implemented as follows (only Flink-revelvant parts and syntax):
>> 
>> class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window> extends Trigger<T, W> {
>> 	
>> 	ValueStateDescriptor<Long> windowEnd = new ValueStateDescriptor<>("windowEnd", Long.class);
>> 	
>> 	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
>> 		val windowEndState = ctx.getPartitionedState(windowEnd);
>> 		val windowEndsAt = windowEndState.value();
>> 		val newEnd = element.getTimestamp();
>> 		
>> 		// no timer set yet, or intention to trigger earlier
>> 		if (windowEndsAt == null || newEnd <= windowEndsAt) {
>> 			deleteCurrentTimer(ctx);
>> 			
>> 			// trigger time far enough from now => schedule timer
>> 			if (newEnd > System.currentTimeMillis() + 100) {
>> 				ctx.registerProcessingTimeTimer(newEnd);
>> 				windowEndState.update(newEnd);
>> 			} else {
>> 				return TriggerResult.FIRE;	// close enough => fire immediately
>> 			}
>> 		}
>> 		
>> 		// ignore events that should be triggered in the future
>> 		return TriggerResult.CONTINUE;
>> 	}
>> 
>> 	// fire when timer has reached pre-set time
>> 	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
>> 		return TriggerResult.FIRE_AND_PURGE;
>> 	}
>> 
>> 	// noop
>> 	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
>> 		return TriggerResult.CONTINUE;
>> 	}
>> 	
>> 	void clear(W window, TriggerContext ctx) throws Exception {
>> 		deleteCurrentTimer(ctx);
>> 	}
>> 	
>> 	void deleteCurrentTimer(TriggerContext ctx) throws Exception {
>> 		val windowEndState = ctx.getPartitionedState(windowEnd);
>> 		val windowEndsAt = windowEndState.value();
>> 
>> 		if (windowEndsAt != null) {
>> 			ctx.deleteProcessingTimeTimer(windowEndsAt);
>> 			windowEndState.clear();
>> 		}
>> 	}
>> 		
>> 	boolean canMerge() { return false; }
>> }
>> 
>> The job state grows by the number of scheduled entities and the mechanism works as intended, as long as the job runs. However, due to unrelated reasons, the job sometimes fails and is restarted from a checkpoint. The state size after the restore tells me that the state has been restored.
>> 
>> Yet, the mechanism stops working and none of the old scheduling events that must have been ‚waiting‘ in the window for the timer to trigger are actually forwarded. Hence my question if it’s possible that timers may not be restored.
>> 
>> Any ideas what might have gone wrong? Is there a better way to implement such a mechanism?
>> 
>> Thanks and enjoy the rest of your weekend :)
>> Florian
>> 
>> 
>>> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <al...@apache.org>:
>>> 
>>> When restoring, processing-time timers that would have fired already should immediately fire.
>>> 
>>> @Florian what Flink version are you using? In Flink 1.1 there was a bug that led to processing-time timers not being reset when restoring.
>>>> On 17 Mar 2017, at 15:39, Florian König <fl...@micardo.com> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.
>>>> 
>>>> Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?
>>>> 
>>>> Thanks
>>>> Florian
>>>> 
>>>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
>>>>> 
>>>>> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
>>>>> 
>>>>> Best,
>>>>> Yassine
>>>> 
>>>> 
>>> 
>> 
>> 
> 



Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Aljoscha Krettek <al...@apache.org>.
As a general remark, I think the ProcessFunction (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html) could be better suited for implementing such a use case.

I did run tests on Flink 1.2 and master with a simple processing-time windowing job. After performing a savepoint and waiting a few minutes I restored and the windows that were still there immediately fired.

In your case, after restoring, is the Job also reading new elements or did you try with just restoring without any new input?

> On 19 Mar 2017, at 13:15, Florian König <fl...@micardo.com> wrote:
> 
> @Aljoscha: We’re using 1.2.
> 
> The intention of our code is as follows: The events that flow through Flink represent scheduling decisions, i.e. they contain the ID of a target entity, a description of an action that should be performed on that entity by some other job, and a timestamp of when that should happen.
> 
> We’re using the windowing mechanism to delay those events until they should be forwarded (and trigger the corresponding action). Furthermore, the schedule can be moved closer to the current point in time: subsequent schedule events for an entity (identified by its ID) can set the trigger time to an earlier instant. If the trigger time is in the past or very shortly (e.g., 100 ms) after now, the action should be triggered immediately. Actions scheduled for an instant after the currently planned one should be ignored; i.e. the schedule cannot be moved to the future.
> 
> exemplary event stream
> 	time … (ID, action, trigger time)	// intended reaction
> 	0 … (1, 'foo', 10)		// trigger action 'foo' on entity 1 at time 10
> 	3 … (2, 'bar', 15)		// trigger action 'bar' on entity 2 at time 15
> 	4 … (1, 'foo', 7)		// move trigger back to time 7
> 	9 … (1, 'foo', 12)		// ignore
> 	15 … (2, 'bar', 15)		// trigger immediately
> 
> resulting stream:
> 	(1, 'foo', 7)		// at time 7
> 	(2, 'bar', 15)		// at time 15
> 
> To implement this, we have written a custom trigger that’s called from the following Flink code:
> 
> …
> schedules.keyBy(schedule -> schedule.entityId)
> 		.window(GlobalWindows.create())
> 		.trigger(DynamicEarliestWindowTrigger.create())
> 		.fold((Schedule) null, (folded, schedule) -> schedule)
> 		.map( /* process schedules */ )
> …
> 
> We fold the scheduling events 'to themselves', because only the latest event in each period is relevant. The custom trigger is implemented as follows (only Flink-revelvant parts and syntax):
> 
> class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window> extends Trigger<T, W> {
> 	
> 	ValueStateDescriptor<Long> windowEnd = new ValueStateDescriptor<>("windowEnd", Long.class);
> 	
> 	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
> 		val windowEndState = ctx.getPartitionedState(windowEnd);
> 		val windowEndsAt = windowEndState.value();
> 		val newEnd = element.getTimestamp();
> 		
> 		// no timer set yet, or intention to trigger earlier
> 		if (windowEndsAt == null || newEnd <= windowEndsAt) {
> 			deleteCurrentTimer(ctx);
> 			
> 			// trigger time far enough from now => schedule timer
> 			if (newEnd > System.currentTimeMillis() + 100) {
> 				ctx.registerProcessingTimeTimer(newEnd);
> 				windowEndState.update(newEnd);
> 			} else {
> 				return TriggerResult.FIRE;	// close enough => fire immediately
> 			}
> 		}
> 		
> 		// ignore events that should be triggered in the future
> 		return TriggerResult.CONTINUE;
> 	}
> 
> 	// fire when timer has reached pre-set time
> 	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
> 		return TriggerResult.FIRE_AND_PURGE;
> 	}
> 
> 	// noop
> 	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
> 		return TriggerResult.CONTINUE;
> 	}
> 	
> 	void clear(W window, TriggerContext ctx) throws Exception {
> 		deleteCurrentTimer(ctx);
> 	}
> 	
> 	void deleteCurrentTimer(TriggerContext ctx) throws Exception {
> 		val windowEndState = ctx.getPartitionedState(windowEnd);
> 		val windowEndsAt = windowEndState.value();
> 
> 		if (windowEndsAt != null) {
> 			ctx.deleteProcessingTimeTimer(windowEndsAt);
> 			windowEndState.clear();
> 		}
> 	}
> 		
> 	boolean canMerge() { return false; }
> }
> 
> The job state grows by the number of scheduled entities and the mechanism works as intended, as long as the job runs. However, due to unrelated reasons, the job sometimes fails and is restarted from a checkpoint. The state size after the restore tells me that the state has been restored.
> 
> Yet, the mechanism stops working and none of the old scheduling events that must have been ‚waiting‘ in the window for the timer to trigger are actually forwarded. Hence my question if it’s possible that timers may not be restored.
> 
> Any ideas what might have gone wrong? Is there a better way to implement such a mechanism?
> 
> Thanks and enjoy the rest of your weekend :)
> Florian
> 
> 
>> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <al...@apache.org>:
>> 
>> When restoring, processing-time timers that would have fired already should immediately fire.
>> 
>> @Florian what Flink version are you using? In Flink 1.1 there was a bug that led to processing-time timers not being reset when restoring.
>>> On 17 Mar 2017, at 15:39, Florian König <fl...@micardo.com> wrote:
>>> 
>>> Hi,
>>> 
>>> funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.
>>> 
>>> Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?
>>> 
>>> Thanks
>>> Florian
>>> 
>>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
>>>> 
>>>> Hi all,
>>>> 
>>>> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
>>>> 
>>>> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
>>>> 
>>>> Best,
>>>> Yassine
>>> 
>>> 
>> 
> 
> 


Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Florian König <fl...@micardo.com>.
@Aljoscha: We’re using 1.2.

The intention of our code is as follows: The events that flow through Flink represent scheduling decisions, i.e. they contain the ID of a target entity, a description of an action that should be performed on that entity by some other job, and a timestamp of when that should happen.

We’re using the windowing mechanism to delay those events until they should be forwarded (and trigger the corresponding action). Furthermore, the schedule can be moved closer to the current point in time: subsequent schedule events for an entity (identified by its ID) can set the trigger time to an earlier instant. If the trigger time is in the past or very shortly (e.g., 100 ms) after now, the action should be triggered immediately. Actions scheduled for an instant after the currently planned one should be ignored; i.e. the schedule cannot be moved to the future.

exemplary event stream
	time … (ID, action, trigger time)	// intended reaction
	0 … (1, 'foo', 10)		// trigger action 'foo' on entity 1 at time 10
	3 … (2, 'bar', 15)		// trigger action 'bar' on entity 2 at time 15
	4 … (1, 'foo', 7)		// move trigger back to time 7
	9 … (1, 'foo', 12)		// ignore
	15 … (2, 'bar', 15)		// trigger immediately

resulting stream:
	(1, 'foo', 7)		// at time 7
	(2, 'bar', 15)		// at time 15

To implement this, we have written a custom trigger that’s called from the following Flink code:

…
schedules.keyBy(schedule -> schedule.entityId)
		.window(GlobalWindows.create())
		.trigger(DynamicEarliestWindowTrigger.create())
		.fold((Schedule) null, (folded, schedule) -> schedule)
		.map( /* process schedules */ )
…

We fold the scheduling events 'to themselves', because only the latest event in each period is relevant. The custom trigger is implemented as follows (only Flink-revelvant parts and syntax):

class DynamicEarliestWindowTrigger<T extends Timestamped, W extends Window> extends Trigger<T, W> {
	
	ValueStateDescriptor<Long> windowEnd = new ValueStateDescriptor<>("windowEnd", Long.class);
	
	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		val windowEndState = ctx.getPartitionedState(windowEnd);
		val windowEndsAt = windowEndState.value();
		val newEnd = element.getTimestamp();
		
		// no timer set yet, or intention to trigger earlier
		if (windowEndsAt == null || newEnd <= windowEndsAt) {
			deleteCurrentTimer(ctx);
			
			// trigger time far enough from now => schedule timer
			if (newEnd > System.currentTimeMillis() + 100) {
				ctx.registerProcessingTimeTimer(newEnd);
				windowEndState.update(newEnd);
			} else {
				return TriggerResult.FIRE;	// close enough => fire immediately
			}
		}
		
		// ignore events that should be triggered in the future
		return TriggerResult.CONTINUE;
	}

	// fire when timer has reached pre-set time
	TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.FIRE_AND_PURGE;
	}

	// noop
	TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}
	
	void clear(W window, TriggerContext ctx) throws Exception {
		deleteCurrentTimer(ctx);
	}
	
	void deleteCurrentTimer(TriggerContext ctx) throws Exception {
		val windowEndState = ctx.getPartitionedState(windowEnd);
		val windowEndsAt = windowEndState.value();

		if (windowEndsAt != null) {
			ctx.deleteProcessingTimeTimer(windowEndsAt);
			windowEndState.clear();
		}
	}
		
	boolean canMerge() { return false; }
}

The job state grows by the number of scheduled entities and the mechanism works as intended, as long as the job runs. However, due to unrelated reasons, the job sometimes fails and is restarted from a checkpoint. The state size after the restore tells me that the state has been restored.

Yet, the mechanism stops working and none of the old scheduling events that must have been ‚waiting‘ in the window for the timer to trigger are actually forwarded. Hence my question if it’s possible that timers may not be restored.

Any ideas what might have gone wrong? Is there a better way to implement such a mechanism?

Thanks and enjoy the rest of your weekend :)
Florian


> Am 17.03.2017 um 16:51 schrieb Aljoscha Krettek <al...@apache.org>:
> 
> When restoring, processing-time timers that would have fired already should immediately fire.
> 
> @Florian what Flink version are you using? In Flink 1.1 there was a bug that led to processing-time timers not being reset when restoring.
>> On 17 Mar 2017, at 15:39, Florian König <fl...@micardo.com> wrote:
>> 
>> Hi,
>> 
>> funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.
>> 
>> Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?
>> 
>> Thanks
>> Florian
>> 
>>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
>>> 
>>> Hi all,
>>> 
>>> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
>>> 
>>> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
>>> 
>>> Best,
>>> Yassine
>> 
>> 
> 



Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Aljoscha Krettek <al...@apache.org>.
When restoring, processing-time timers that would have fired already should immediately fire.

@Florian what Flink version are you using? In Flink 1.1 there was a bug that led to processing-time timers not being reset when restoring.
> On 17 Mar 2017, at 15:39, Florian König <fl...@micardo.com> wrote:
> 
> Hi,
> 
> funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.
> 
> Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?
> 
> Thanks
> Florian
> 
>> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
>> 
>> Hi all,
>> 
>> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
>> 
>> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
>> 
>> Best,
>> Yassine
> 
> 


Re: ProcessingTimeTimer in ProcessFunction after a savepoint

Posted by Florian König <fl...@micardo.com>.
Hi,

funny coincidence, I was just about to ask the same thing. I have noticed this with restored checkpoints in one of my jobs. The timers seem to be gone. My window trigger registers a processing timer, but it seems that these don’t get restored - even if the timer is set to fire in the future, after the restore.

Is there something I need to be aware of in my class implementing Trigger? Anything I forgot to set in a method that’s being called upon a restore?

Thanks
Florian

> Am 17.03.2017 um 15:14 schrieb Yassine MARZOUGUI <y....@mindlytix.com>:
> 
> Hi all,
> 
> How does the processing time timer behave when a job is taken down with a savepoint and then restarted after the timer was supposed to fire? Will the timer fire at restart because it was missed during the savepoint?
> 
> I'm wondering because I would like to schedule periodic timers in the future (in processing time) at which a state is read and emitted, but I'm afraid the timer will never fire if it occurs when the job is being down, and therefore the state will never be emitted.
> 
> Best,
> Yassine