You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (JIRA)" <ji...@apache.org> on 2019/06/17 11:43:00 UTC

[jira] [Created] (FLINK-12872) WindowOperator may fail with UnsupportedOperationException when merging windows

Piotr Nowojski created FLINK-12872:
--------------------------------------

             Summary: WindowOperator may fail with UnsupportedOperationException when merging windows
                 Key: FLINK-12872
                 URL: https://issues.apache.org/jira/browse/FLINK-12872
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.8.0, 1.7.2, 1.6.4
            Reporter: Piotr Nowojski


[Reported |http://mail-archives.apache.org/mod_mbox/flink-user/201906.mbox/%3CCALDWsfhbP6D9+pnTzYuGaP0V4nReKJ4s9VsG_Xe1hZJq4O=z9g@mail.gmail.com%3E] by a user.

{noformat}
I have a job that uses processing time session window with inactivity gap of 60ms where I intermittently run into the following exception. I'm trying to figure out what happened here. Haven't been able to reproduce this scenario. Any thoughts?

java.lang.UnsupportedOperationException: The end timestamp of a processing-time window cannot become earlier than the current processing time by merging. Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, end=1560493731778}
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
{noformat}

This is happening probably because {{System.currentTimeMillis()}} is not a monotonic function and {{WindowOperator}} accesses it at least twice: once when it creates a window and second time during performing the above mentioned check (that has failed). However I would guess there are more places like this, not only in {{WindowOperator}}.

The fix could be either to make sure that processing time is monotonic, or to access it only once per operator per record or to drop processing time in favour of ingestion time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)