You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abhishek Jain <ab...@gmail.com> on 2019/06/14 08:14:44 UTC

Error while using session window

Hi,
I have a job that uses processing time session window with inactivity gap
of 60ms where I intermittently run into the following exception. I'm trying
to figure out what happened here. Haven't been able to reproduce this
scenario. Any thoughts?

java.lang.UnsupportedOperationException: *The end timestamp of a
processing-time window cannot become earlier than the current
processing time by merging. Current processing time: 1560493731808
window: TimeWindow{start=1560493731654, end=1560493731778}*
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)


-- 
Warm Regards,
Abhishek Jain

Re: Error while using session window

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

Thanks for reporting the issue. I think this might be caused by System.currentTimeMillis() not being monotonic [1] and the fact Flink is accessing this function per element multiple times (at least twice: first for creating a window, second to perform the check that has failed in your case), however I’m pretty sure that this is more general problem in more places.I have created a ticket for this. [2]

I’m not sure if there is an easy hot fix for that. You would have to increase inactivity gap, switch to ingestion/even time (anyway preferable), make sure that machine’s time doesn’t change or just ignore the problem and accept some failure from time to time.

Piotrek

[1] https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls <https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls>
[2] https://issues.apache.org/jira/browse/FLINK-12872 <https://issues.apache.org/jira/browse/FLINK-12872>

> On 14 Jun 2019, at 10:14, Abhishek Jain <ab...@gmail.com> wrote:
> 
> Hi,
> I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?
> 
> java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
> 	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> -- 
> Warm Regards,
> Abhishek Jain
>