You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2021/04/21 23:05:48 UTC

event-time window cannot become earlier than the current watermark by merging

Hey folks,
               I had a pipe with sessionization restarts and then fail
after retries with this exception. The only thing I had done was to
increase the lateness by 12 hours ( to  a day )  in this pipe and restart
from SP and it ran for 12 hours plus without issue. I cannot imagine that
increasing the lateness created this and the way I solved this was to
increase the lateness further. Could this be if there are TMs in the
cluster whose time is off ( as in not synchronized )  ?

2021-04-21 11:27:58
java.lang.UnsupportedOperationException: The end timestamp of an event-time
window cannot become earlier than the current watermark by merging. Current
watermark: 1618966593999 window: TimeWindow{start=1618878336107, end=
1618880140466}
    at org.apache.flink.streaming.runtime.operators.windowing.
WindowOperator$2.merge(WindowOperator.java:339)
    at org.apache.flink.streaming.runtime.operators.windowing.
WindowOperator$2.merge(WindowOperator.java:321)
    at org.apache.flink.streaming.runtime.operators.windowing.
MergingWindowSet.addWindow(MergingWindowSet.java:209)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:319)
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:191)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:396)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
The only thing I can think of is to add the lateness configured to the
filter as in here, as in the time on the element + lateness should always
be greater then the current WM. As in the current issue is



Mon Apr 19 20:46:20 EDT 2021.  Window end

Wed Apr 21 21:09:02 EDT 2021,  WM


an event forced this merged window. And it is likely that it has the time
of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit
https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125


Either ways the solution is yukky and not sure how it happened the first
place ?


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}

On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 1618877773663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Hey folks,
>>>                I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
Great, thanks for the update.  The upfront filter does work and has for the
last 24 hours and no reason why it should not.

Again I have to note that there is no mail group that has been this
reactive to issues, so thank you again.



On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl <ma...@ververica.com>
wrote:

> After having talked to David about this issue offline, I decided to create
> a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
> mailing list, Vishal. Hopefully, the community has the chance to look into
> it.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22425
>
> On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> To me, it sounds strange. I would have expected it to work with
>> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
>> look at it. Maybe, he has some more insights. I haven't worked that much
>> with lateness, yet.
>>
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in *now* pulled from the simulated sideout , essentially the
>>> Process Function with a reverse predicate to the Filter Process Function.
>>>
>>>
>>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> And when I added the filter the Exception was not thrown. So the
>>>> sequence of events
>>>>
>>>> * Increased lateness from 12 ( that was what it was initially running
>>>> with )  to 24 hours
>>>> * the pipe ran as desired before it blew up with the Exception
>>>> * masked the issue by increasing the lateness to 48 hours.
>>>> * It blew up again but now after the added lateness, so essentially the
>>>> same issue but added lateness let the pipe run for another few hours.
>>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>>> metrics show that no data is being pushed through the sideoutput and that
>>>> data in not pulled from the a simulated sideout ( below )
>>>>
>>>>
>>>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<
>>>> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> long allowedLateness;
>>>> public LateEventFilter(long allowedLateness){
>>>> this.allowedLateness = allowedLateness;
>>>> }
>>>> @Override
>>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>>> ctx,
>>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>> currentWatermark()) {
>>>> out.collect(value);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> public class LateEventSideOutput extends ProcessFunction<
>>>> KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> long allowedLateness;
>>>> public LateEventSideOutput(long allowedLateness){
>>>> this.allowedLateness = allowedLateness;
>>>> }
>>>> @Override
>>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>>> ctx,
>>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>>> currentWatermark()) {
>>>> out.collect(value);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>>  I am using RocksDB as a backend if that helps.
>>>>
>>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Yes sir. The allowedLateNess and side output always existed.
>>>>>
>>>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>>>> being added to your pipeline when running into the
>>>>>> UnsupportedOperationException issue previously?
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> As in this is essentially doing what lateness *should* have done
>>>>>>> And I think that is a bug. My code now is . Please look at
>>>>>>> the allowedLateness on the session window.
>>>>>>>
>>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>>> filteredKeyedValue = keyedValue
>>>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>>>> "late_filter").uid("late_filter");
>>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>>> lateKeyedValue = keyedValue
>>>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>>>> name("late_data").uid("late_data");
>>>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>>>> aggregate = filteredKeyedValue
>>>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null
>>>>>>> ).keyBy(value -> value.getKey())
>>>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData
>>>>>>> (lateOutputTag)
>>>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes,
>>>>>>> this.lateNessInMinutes))
>>>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>>>> you please validate. That aside I already have the lateness configured for
>>>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>>>> flink bug ? )
>>>>>>>>
>>>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>>>> currentWatermark()) {
>>>>>>>> out.collect(value);
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <
>>>>>>>> matthias@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>> based on the error message and the behavior you described,
>>>>>>>>> introducing a filter for late events is the way to go - just as described
>>>>>>>>> in the SO thread you mentioned. Usually, you would collect late events in
>>>>>>>>> some kind of side output [1].
>>>>>>>>>
>>>>>>>>> I hope that helps.
>>>>>>>>> Matthias
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>>
>>>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I saw
>>>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Well it was not a solution after all. We now have a session
>>>>>>>>>>> window that is stuck with the same issue albeit  after the additional
>>>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked the issue
>>>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( instead
>>>>>>>>>>> of the 1 day ) before. This is very disconcerting.
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>>>> timestamp of an event-time window cannot become earlier than
>>>>>>>>>>> the current watermark by merging. Current watermark:
>>>>>>>>>>> 1619053742129 window: TimeWindow{start=1618877773663, end=
>>>>>>>>>>> 1618879580402}
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey folks,
>>>>>>>>>>>>                I had a pipe with sessionization restarts and
>>>>>>>>>>>> then fail after retries with this exception. The only thing I had done was
>>>>>>>>>>>> to increase the lateness by 12 hours ( to  a day )  in this pipe and
>>>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I cannot
>>>>>>>>>>>> imagine that increasing the lateness created this and the way I solved this
>>>>>>>>>>>> was to increase the lateness further. Could this be if there are TMs in the
>>>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>>>
>>>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of
>>>>>>>>>>>> an event-time window cannot become earlier than the current watermark
>>>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>>>> .java:204)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174
>>>>>>>>>>>> )
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>>>> .java:65)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task
>>>>>>>>>>>> .java:755)
>>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>>>> 570)
>>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Matthias Pohl <ma...@ververica.com>.
After having talked to David about this issue offline, I decided to create
a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
mailing list, Vishal. Hopefully, the community has the chance to look into
it.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22425

On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <ma...@ververica.com>
wrote:

> To me, it sounds strange. I would have expected it to work with
> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
> look at it. Maybe, he has some more insights. I haven't worked that much
> with lateness, yet.
>
> Matthias
>
> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in *now* pulled from the simulated sideout , essentially the
>> Process Function with a reverse predicate to the Filter Process Function.
>>
>>
>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> And when I added the filter the Exception was not thrown. So the
>>> sequence of events
>>>
>>> * Increased lateness from 12 ( that was what it was initially running
>>> with )  to 24 hours
>>> * the pipe ran as desired before it blew up with the Exception
>>> * masked the issue by increasing the lateness to 48 hours.
>>> * It blew up again but now after the added lateness, so essentially the
>>> same issue but added lateness let the pipe run for another few hours.
>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>>
>>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
>>> VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventFilter(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>> ctx,
>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue
>>> <KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>> private static final long serialVersionUID = 1L;
>>>
>>> long allowedLateness;
>>> public LateEventSideOutput(long allowedLateness){
>>> this.allowedLateness = allowedLateness;
>>> }
>>> @Override
>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>> ctx,
>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>> }
>>> }
>>>
>>>
>>>
>>>  I am using RocksDB as a backend if that helps.
>>>
>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Yes sir. The allowedLateNess and side output always existed.
>>>>
>>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>>> being added to your pipeline when running into the
>>>>> UnsupportedOperationException issue previously?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>
>>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> As in this is essentially doing what lateness *should* have done
>>>>>> And I think that is a bug. My code now is . Please look at
>>>>>> the allowedLateness on the session window.
>>>>>>
>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>> filteredKeyedValue = keyedValue
>>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>>> "late_filter").uid("late_filter");
>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>> lateKeyedValue = keyedValue
>>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>>> name("late_data").uid("late_data");
>>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>>> aggregate = filteredKeyedValue
>>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>>>> keyBy(value -> value.getKey())
>>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>>>> lateOutputTag)
>>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes,
>>>>>> this.lateNessInMinutes))
>>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>>> you please validate. That aside I already have the lateness configured for
>>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>>> flink bug ? )
>>>>>>>
>>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>>> currentWatermark()) {
>>>>>>> out.collect(value);
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <
>>>>>>> matthias@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>> based on the error message and the behavior you described,
>>>>>>>> introducing a filter for late events is the way to go - just as described
>>>>>>>> in the SO thread you mentioned. Usually, you would collect late events in
>>>>>>>> some kind of side output [1].
>>>>>>>>
>>>>>>>> I hope that helps.
>>>>>>>> Matthias
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I saw
>>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>>>>
>>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Well it was not a solution after all. We now have a session
>>>>>>>>>> window that is stuck with the same issue albeit  after the additional
>>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked the issue
>>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( instead
>>>>>>>>>> of the 1 day ) before. This is very disconcerting.
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey folks,
>>>>>>>>>>>                I had a pipe with sessionization restarts and
>>>>>>>>>>> then fail after retries with this exception. The only thing I had done was
>>>>>>>>>>> to increase the lateness by 12 hours ( to  a day )  in this pipe and
>>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I cannot
>>>>>>>>>>> imagine that increasing the lateness created this and the way I solved this
>>>>>>>>>>> was to increase the lateness further. Could this be if there are TMs in the
>>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>>
>>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of
>>>>>>>>>>> an event-time window cannot become earlier than the current watermark
>>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>>> .java:204)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>>> .java:65)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task
>>>>>>>>>>> .java:755)
>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>>> 570)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Matthias Pohl <ma...@ververica.com>.
To me, it sounds strange. I would have expected it to work with
`allowedLateness` and `sideOutput` being defined. I pull in David to have a
look at it. Maybe, he has some more insights. I haven't worked that much
with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <vi...@gmail.com>
wrote:

>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in not pulled from the a simulated sideout ( below )
>
> >> Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in *now* pulled from the simulated sideout , essentially the Process
> Function with a reverse predicate to the Filter Process Function.
>
>
> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> And when I added the filter the Exception was not thrown. So the sequence
>> of events
>>
>> * Increased lateness from 12 ( that was what it was initially running
>> with )  to 24 hours
>> * the pipe ran as desired before it blew up with the Exception
>> * masked the issue by increasing the lateness to 48 hours.
>> * It blew up again but now after the added lateness, so essentially the
>> same issue but added lateness let the pipe run for another few hours.
>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>>
>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
>> VALUE>, KeyedTimedValue<KEY, VALUE>> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventFilter(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx
>> ,
>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<
>> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventSideOutput(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx
>> ,
>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>>  I am using RocksDB as a backend if that helps.
>>
>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Yes sir. The allowedLateNess and side output always existed.
>>>
>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
>>> wrote:
>>>
>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>> being added to your pipeline when running into the
>>>> UnsupportedOperationException issue previously?
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>
>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> As in this is essentially doing what lateness *should* have done  And
>>>>> I think that is a bug. My code now is . Please look at the allowedLateness
>>>>> on the session window.
>>>>>
>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>> filteredKeyedValue = keyedValue
>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>> "late_filter").uid("late_filter");
>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue
>>>>> = keyedValue
>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>> name("late_data").uid("late_data");
>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>> aggregate = filteredKeyedValue
>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>>> keyBy(value -> value.getKey())
>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>>> lateOutputTag)
>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this
>>>>> .lateNessInMinutes))
>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>
>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>> you please validate. That aside I already have the lateness configured for
>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>> flink bug ? )
>>>>>>
>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>> currentWatermark()) {
>>>>>> out.collect(value);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>> based on the error message and the behavior you described,
>>>>>>> introducing a filter for late events is the way to go - just as described
>>>>>>> in the SO thread you mentioned. Usually, you would collect late events in
>>>>>>> some kind of side output [1].
>>>>>>>
>>>>>>> I hope that helps.
>>>>>>> Matthias
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> I saw
>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Well it was not a solution after all. We now have a session window
>>>>>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>>>>>>> ) before. This is very disconcerting.
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey folks,
>>>>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>
>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>> .java:204)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>> .java:65)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:
>>>>>>>>>> 755)
>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>> 570)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also
metrics show that no data is being pushed through the sideoutput and that
data in not pulled from the a simulated sideout ( below )

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
*now* pulled from the simulated sideout , essentially the Process Function
with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> And when I added the filter the Exception was not thrown. So the sequence
> of events
>
> * Increased lateness from 12 ( that was what it was initially running with
> )  to 24 hours
> * the pipe ran as desired before it blew up with the Exception
> * masked the issue by increasing the lateness to 48 hours.
> * It blew up again but now after the added lateness, so essentially the
> same issue but added lateness let the pipe run for another few hours.
> * Added the Fliter upfront  as below, the pipe has no issues. Also metrics
> show that no data is being pushed through the sideoutput and that data in
> not pulled from the a simulated sideout ( below )
>
>
> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
> VALUE>, KeyedTimedValue<KEY, VALUE>> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventFilter(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<
> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventSideOutput(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
>
>  I am using RocksDB as a backend if that helps.
>
> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Yes sir. The allowedLateNess and side output always existed.
>>
>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>> being added to your pipeline when running into the
>>> UnsupportedOperationException issue previously?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> As in this is essentially doing what lateness *should* have done  And
>>>> I think that is a bug. My code now is . Please look at the allowedLateness
>>>> on the session window.
>>>>
>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>> filteredKeyedValue = keyedValue
>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>> "late_filter").uid("late_filter");
>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue
>>>> = keyedValue
>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name
>>>> ("late_data").uid("late_data");
>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>> aggregate = filteredKeyedValue
>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>> keyBy(value -> value.getKey())
>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>> lateOutputTag)
>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
>>>> lateNessInMinutes))
>>>> .name("session_aggregate").uid("session_aggregate");
>>>>
>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> I can do that, but I am not certain this is the right filter.  Can you
>>>>> please validate. That aside I already have the lateness configured for the
>>>>> session window ( the normal withLateNess() )  and this looks like a session
>>>>> window was not collected and still is alive for some reason ( a flink bug ?
>>>>> )
>>>>>
>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>> currentWatermark()) {
>>>>> out.collect(value);
>>>>> }
>>>>>
>>>>>
>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>> based on the error message and the behavior you described,
>>>>>> introducing a filter for late events is the way to go - just as described
>>>>>> in the SO thread you mentioned. Usually, you would collect late events in
>>>>>> some kind of side output [1].
>>>>>>
>>>>>> I hope that helps.
>>>>>> Matthias
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> I saw
>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Well it was not a solution after all. We now have a session window
>>>>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>>>>>> ) before. This is very disconcerting.
>>>>>>>>
>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey folks,
>>>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>
>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:
>>>>>>>>> 204)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:
>>>>>>>>> 65)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>>>> StreamTask.java:581)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:
>>>>>>>>> 755)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570
>>>>>>>>> )
>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Matthias Pohl | Engineer
>>>
>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
>>> Anton Wehner
>>>
>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
And when I added the filter the Exception was not thrown. So the sequence
of events

* Increased lateness from 12 ( that was what it was initially running with
)  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the
same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
not pulled from the a simulated sideout ( below )


public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<KEY,
VALUE>, KeyedTimedValue<KEY, VALUE>> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps.

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Yes sir. The allowedLateNess and side output always existed.
>
> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>> being added to your pipeline when running into the
>> UnsupportedOperationException issue previously?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> As in this is essentially doing what lateness *should* have done  And I
>>> think that is a bug. My code now is . Please look at the allowedLateness on
>>> the session window.
>>>
>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>> filteredKeyedValue = keyedValue
>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>> "late_filter").uid("late_filter");
>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue
>>> = keyedValue
>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>>> "late_data").uid("late_data");
>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>> aggregate = filteredKeyedValue
>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>> keyBy(value -> value.getKey())
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>> lateOutputTag)
>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
>>> lateNessInMinutes))
>>> .name("session_aggregate").uid("session_aggregate");
>>>
>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> I can do that, but I am not certain this is the right filter.  Can you
>>>> please validate. That aside I already have the lateness configured for the
>>>> session window ( the normal withLateNess() )  and this looks like a session
>>>> window was not collected and still is alive for some reason ( a flink bug ?
>>>> )
>>>>
>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>> currentWatermark()) {
>>>> out.collect(value);
>>>> }
>>>>
>>>>
>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Vishal,
>>>>> based on the error message and the behavior you described, introducing
>>>>> a filter for late events is the way to go - just as described in the SO
>>>>> thread you mentioned. Usually, you would collect late events in some kind
>>>>> of side output [1].
>>>>>
>>>>> I hope that helps.
>>>>> Matthias
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>
>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> I saw
>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Well it was not a solution after all. We now have a session window
>>>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>>>>> ) before. This is very disconcerting.
>>>>>>>
>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hey folks,
>>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>
>>>>>>>> 2021-04-21 11:27:58
>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:
>>>>>>>> 65)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>>> StreamTask.java:581)
>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:
>>>>>>>> 755)
>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
Yes sir. The allowedLateNess and side output always existed.

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <ma...@ververica.com>
wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> As in this is essentially doing what lateness *should* have done  And I
>> think that is a bug. My code now is . Please look at the allowedLateness on
>> the session window.
>>
>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
>> keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>> "late_data").uid("late_data");
>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate<KEY, VALUE>(),
>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
>> lateNessInMinutes))
>> .name("session_aggregate").uid("session_aggregate");
>>
>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I can do that, but I am not certain this is the right filter.  Can you
>>> please validate. That aside I already have the lateness configured for the
>>> session window ( the normal withLateNess() )  and this looks like a session
>>> window was not collected and still is alive for some reason ( a flink bug ?
>>> )
>>>
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>>
>>>
>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>> based on the error message and the behavior you described, introducing
>>>> a filter for late events is the way to go - just as described in the SO
>>>> thread you mentioned. Usually, you would collect late events in some kind
>>>> of side output [1].
>>>>
>>>> I hope that helps.
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>
>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> I saw
>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>
>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> Well it was not a solution after all. We now have a session window
>>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>>>> ) before. This is very disconcerting.
>>>>>>
>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Hey folks,
>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>
>>>>>>> 2021-04-21 11:27:58
>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>> OneInputStreamTask.java:191)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>> StreamTask.java:581)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755
>>>>>>> )
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica <https://www.ververica.com/>
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Matthias Pohl <ma...@ververica.com>.
You're saying that you used `allowedLateness`/`sideOutputLateData` as
described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
being added to your pipeline when running into the
UnsupportedOperationException issue previously?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> As in this is essentially doing what lateness *should* have done  And I
> think that is a bug. My code now is . Please look at the allowedLateness on
> the session window.
>
> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue
> = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
> keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
> "late_data").uid("late_data");
> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy
> (value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate<KEY, VALUE>(),
> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
> lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can you
>> please validate. That aside I already have the lateness configured for the
>> session window ( the normal withLateNess() )  and this looks like a session
>> window was not collected and still is alive for some reason ( a flink bug ?
>> )
>>
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>>
>>
>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
>> wrote:
>>
>>> Hi Vishal,
>>> based on the error message and the behavior you described, introducing a
>>> filter for late events is the way to go - just as described in the SO
>>> thread you mentioned. Usually, you would collect late events in some kind
>>> of side output [1].
>>>
>>> I hope that helps.
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> I saw
>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>
>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Well it was not a solution after all. We now have a session window
>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>>> ) before. This is very disconcerting.
>>>>>
>>>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp
>>>>> of an event-time window cannot become earlier than the current
>>>>> watermark by merging. Current watermark: 1619053742129 window:
>>>>> TimeWindow{start=1618877773663, end=1618879580402}
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> Hey folks,
>>>>>>                I had a pipe with sessionization restarts and then
>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>
>>>>>> 2021-04-21 11:27:58
>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>> event-time window cannot become earlier than the current watermark by
>>>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>>>> 1618878336107, end=1618880140466}
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>> OneInputStreamTask.java:191)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .processInput(StreamTask.java:396)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>> StreamTask.java:581)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>>
>>>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
As in this is essentially doing what lateness *should* have done  And I
think that is a bug. My code now is . Please look at the allowedLateness on
the session window.

SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue
= keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
"late_filter").uid("late_filter");
SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
"late_data").uid("late_data");
SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> aggregate
= filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(
value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate<KEY, VALUE>(),
new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a session
> window was not collected and still is alive for some reason ( a flink bug ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described, introducing a
>> filter for late events is the way to go - just as described in the SO
>> thread you mentioned. Usually, you would collect late events in some kind
>> of side output [1].
>>
>> I hope that helps.
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> I saw
>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>> and this seems to suggest a straight up filter, but I am not sure how does
>>> that filter works as in would it factor is the lateness when filtering ?
>>>
>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Well it was not a solution after all. We now have a session window that
>>>> is stuck with the same issue albeit  after the additional lateness. I had
>>>> increased the lateness to 2 days and that masked the issue which again
>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>>> ) before. This is very disconcerting.
>>>>
>>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp
>>>> of an event-time window cannot become earlier than the current
>>>> watermark by merging. Current watermark: 1619053742129 window:
>>>> TimeWindow{start=1618877773663, end=1618879580402}
>>>>
>>>>
>>>>
>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Hey folks,
>>>>>                I had a pipe with sessionization restarts and then fail
>>>>> after retries with this exception. The only thing I had done was to
>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>> increasing the lateness created this and the way I solved this was to
>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>
>>>>> 2021-04-21 11:27:58
>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>> event-time window cannot become earlier than the current watermark by
>>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>>> 1618878336107, end=1618880140466}
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>> OneInputStreamTask.java:191)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .processInput(StreamTask.java:396)
>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>> StreamTask.java:581)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>>
>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
I can do that, but I am not certain this is the right filter.  Can you
please validate. That aside I already have the lateness configured for the
session window ( the normal withLateNess() )  and this looks like a session
window was not collected and still is alive for some reason ( a flink bug ?
)

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <ma...@ververica.com>
wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing a
> filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Well it was not a solution after all. We now have a session window that
>>> is stuck with the same issue albeit  after the additional lateness. I had
>>> increased the lateness to 2 days and that masked the issue which again
>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>> ) before. This is very disconcerting.
>>>
>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>>> an event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>>> 1618877773663, end=1618879580402}
>>>
>>>
>>>
>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Hey folks,
>>>>                I had a pipe with sessionization restarts and then fail
>>>> after retries with this exception. The only thing I had done was to
>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>> increasing the lateness created this and the way I solved this was to
>>>> increase the lateness further. Could this be if there are TMs in the
>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>
>>>> 2021-04-21 11:27:58
>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>> event-time window cannot become earlier than the current watermark by
>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>> 1618878336107, end=1618880140466}
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>> OneInputStreamTask.java:191)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .processInput(StreamTask.java:396)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .runMailboxLoop(StreamTask.java:617)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:581)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>
>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Vishal,
based on the error message and the behavior you described, introducing a
filter for late events is the way to go - just as described in the SO
thread you mentioned. Usually, you would collect late events in some kind
of side output [1].

I hope that helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 1618877773663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Hey folks,
>>>                I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
I saw
https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
and this seems to suggest a straight up filter, but I am not sure how does
that filter works as in would it factor is the lateness when filtering ?

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vi...@gmail.com>
wrote:

> Well it was not a solution after all. We now have a session window that is
> stuck with the same issue albeit  after the additional lateness. I had
> increased the lateness to 2 days and that masked the issue which again
> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
> ) before. This is very disconcerting.
>
> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
> an event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1619053742129 window: TimeWindow{start=
> 1618877773663, end=1618879580402}
>
>
>
> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Hey folks,
>>                I had a pipe with sessionization restarts and then fail
>> after retries with this exception. The only thing I had done was to
>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>> increasing the lateness created this and the way I solved this was to
>> increase the lateness further. Could this be if there are TMs in the
>> cluster whose time is off ( as in not synchronized )  ?
>>
>> 2021-04-21 11:27:58
>> java.lang.UnsupportedOperationException: The end timestamp of an
>> event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>> 1618878336107, end=1618880140466}
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:339)
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:321)
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:319)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:191)
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:204)
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:174)
>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:65)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:396)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:191)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:617)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:581)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>>
>>

Re: event-time window cannot become earlier than the current watermark by merging

Posted by Vishal Santoshi <vi...@gmail.com>.
Well it was not a solution after all. We now have a session window that is
stuck with the same issue albeit  after the additional lateness. I had
increased the lateness to 2 days and that masked the issue which again
reared it's head after the 2 days ;lateness was over ( instead of the 1 day
) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an
event-time window cannot become earlier than the current watermark by
merging. Current watermark: 1619053742129 window: TimeWindow{start=
1618877773663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Hey folks,
>                I had a pipe with sessionization restarts and then fail
> after retries with this exception. The only thing I had done was to
> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
> from SP and it ran for 12 hours plus without issue. I cannot imagine that
> increasing the lateness created this and the way I solved this was to
> increase the lateness further. Could this be if there are TMs in the
> cluster whose time is off ( as in not synchronized )  ?
>
> 2021-04-21 11:27:58
> java.lang.UnsupportedOperationException: The end timestamp of an
> event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1618966593999 window: TimeWindow{start=
> 1618878336107, end=1618880140466}
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:339)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:321)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:319)
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:191)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:396)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:191)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:617)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:581)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>     at java.lang.Thread.run(Thread.java:748)
>
>
>