You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Austin Cawley-Edwards <au...@gmail.com> on 2018/10/30 19:58:15 UTC

Flink CEP Watermark Exception

Hi there,


We have a streaming application that uses CEP processing but are
getting this error fairly frequently after a checkpoint fails, though
not sure if it is related. We have implemented both  `hashCode` and
`equals()` using `Objects.hash(...properties)` and basic equality,
respectively. Has anyone seen this before using CEP?


Here is  the full exception:


java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Could not find previous
entry with key: alertOne, value:
{"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000}
and timestamp: 1539700799999. This can indicate that either you did
not implement the equals() and hashCode() methods of your input
elements properly or that the element belonging to that entry has been
already pruned.
	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)



Best,

Austin

Re: Flink CEP Watermark Exception

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hi Dawid,

Just back in the office. The platform we run on recently announced Flink
1.6.0 support, so we upgraded and haven't seen this problem arise again
yet! We believe it could have been the `equals` method falsely matching
different records in rare instances, though the upgrade to Flink 1.6.0
seemed to minimize those instances.

Thank you for your help,
Austin

On Fri, Nov 2, 2018 at 5:02 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Austin,
>
> Could you provide jobmanagers and taksmanagers logs for a failed run? The
> exception you've posted is thrown during processing messages, rather than
> during restoring, but you said it failed to restore checkpoint, how come it
> processes messages? Could you also describe exact conditions step by step
> when the "IllegalStateException: Could not find previous entry with key"
> happens?
>
> The first two issues regarding CEP you've linked concern very old Flink
> version (1.0.3), CEP library was heavily reworked since then and I would
> not look for any similarities in those cases.
>
> Best,
>
> Dawid
> On 01/11/2018 14:24, Austin Cawley-Edwards wrote:
>
> Hi Dawid,
>
> Thank you for your reply. I'm out for the next few days, so I hope you
> don't mind me cc'ing my team in here. We all really appreciate you and the
> rest of the people monitoring the mailing list.
>
>
> We've only seen this SharedBuffer problem in production, after sending
> around 20 GB of data through. In the Flink UI, we see the checkpoint status
> as:
>
>
> *Checkpoint failed: Checkpoint Coordinator is suspending. *
>
> It then tries to restore the last previously succeeded checkpoint, but
> cannot and throws the SharedBuffer exception. Our state size is around
> 200MB when it fails.
>
> Unfortunately, the platform we are running our cluster on only supports up
> to Flink 1.5. We will continue trying to find a reproducible example for
> you. I have found a few other people with a similar problem (attached at
> the bottom), but none seem to have been resolved.
>
> Our CEP setup looks generally like this:
> Pattern<AlertEvent, ?> pattern = Pattern.<AlertEvent>begin("alertOne")
> .where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(AlertEvent event) {
> return event.level > 0;
> }
> })
> .next("alertTwo").subtype(AlertEvent.class)
> .where(new IterativeCondition<SubEvent>() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context<AlertEvent> ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable<AlertEvent> previousEvents
> = ctx.getEventsForPattern("alertOne");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }
> })
> .next("alertThree").subtype(AlertEvent.class)
> .where(new IterativeCondition<SubEvent>() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context<AlertEvent> ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable<AlertEvent> previousEvents =
> ctx.getEventsForPattern("alertTwo");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }});
> PatternStream<AlertEvent> alertPatternStream =
> CEP.pattern(alertEvents, pattern);
> DataStream<Alert> confirmedAlerts = alertPatternStream
> .select(new PatternSelectFunction<AlertEvent, Alert> {
> private static final long serialVersionUID = 1L;
> @Override
> public Alert select(Map<String, List<AlertEvent>> patternIn) {
> List<AlertEvent> alertEventList = new ArrayList<>();
> // Create an alert composed of three escalating events
> alertEventList.add(patternIn.get("alertOne").get(0));
> alertEventList.add(patternIn.get("alertTwo").get(0));
> alertEventList.add(patternIn.get("alertThree").get(0));
> Alert confirmedAlert = new Alert(alertEventList);
> return confirmedAlert;
> }
> })
> .uid("confirmedAlerts")
> .returns(Alert.class)
> .keyBy("id");
>
> Once again thank you,
> Austin
>
>
> -
> https://stackoverflow.com/questions/36917569/flink-streaming-cep-could-not-find-previous-shared-buffer-entry-with-key
> -
> https://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAC27z=ND0sX_OGBh0vuvZwEtCke2T40tBXHt5huH8uyZDTEqgg@mail.gmail.com%3E
>
> -
> https://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAGr9p8CzT3=cr+cOas=3k0BeCMiybfv+kT1fXv_RF8xaBTudvQ@mail.gmail.com%3E
>
>
> On Thu, Nov 1, 2018 at 4:44 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Austin,
>>
>> Could you elaborate a bit more what do you mean by "after a checkpoint
>> fails", what is the reason why checkpoint fails? Would it be possible for
>> you to prepare some reproducible example for that problem? Finally, I would
>> also recommend trying out Flink 1.6.x, as we reworked the underlying
>> structure for CEP - SharedBuffer.
>>
>> Best,
>>
>> Dawid
>> On 30/10/2018 20:59, Austin Cawley-Edwards wrote:
>>
>> Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.
>>
>> Thanks,
>> Austin
>>
>> On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <
>> austin.cawley@gmail.com> wrote:
>>
>>> Hi there,
>>>
>>>  We have a streaming application that uses CEP processing but are getting this error fairly frequently after a checkpoint fails, though not sure if it is related. We have implemented both  `hashCode` and `equals()` using `Objects.hash(...properties)` and basic equality, respectively. Has anyone seen this before using CEP?
>>>
>>>  Here is  the full exception:
>>>
>>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>> 	at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
>>> 	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>>> 	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>>> 	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>>> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>> 	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>>> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>>>
>>>  Best,
>>>
>>> Austin
>>>
>>>

Re: Flink CEP Watermark Exception

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Austin,

Could you provide jobmanagers and taksmanagers logs for a failed run?
The exception you've posted is thrown during processing messages, rather
than during restoring, but you said it failed to restore checkpoint, how
come it processes messages? Could you also describe exact conditions
step by step when the "IllegalStateException: Could not find previous
entry with key" happens?

The first two issues regarding CEP you've linked concern very old Flink
version (1.0.3), CEP library was heavily reworked since then and I would
not look for any similarities in those cases.

Best,

Dawid

On 01/11/2018 14:24, Austin Cawley-Edwards wrote:
> Hi Dawid,
>
> Thank you for your reply. I'm out for the next few days, so I hope you
> don't mind me cc'ing my team in here. We all really appreciate you and
> the rest of the people monitoring the mailing list. 
>
>
> We've only seen this SharedBuffer problem in production, after sending
> around 20 GB of data through. In the Flink UI, we see the checkpoint
> status as:
>
> *Checkpoint failed: Checkpoint Coordinator is suspending.
> *
> *
> *
> It then tries to restore the last previously succeeded checkpoint, but
> cannot and throws the SharedBuffer exception. Our state size is around
> 200MB when it fails.
>
> Unfortunately, the platform we are running our cluster on only
> supports up to Flink 1.5. We will continue trying to find a
> reproducible example for you. I have found a few other people with a
> similar problem (attached at the bottom), but none seem to have been
> resolved. 
>
> Our CEP setup looks generally like this:
> Pattern<AlertEvent, ?> pattern = Pattern.<AlertEvent>begin("alertOne")
> .where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(AlertEvent event) {
> return event.level > 0;
> }
> })
> .next("alertTwo").subtype(AlertEvent.class)
> .where(new IterativeCondition<SubEvent>() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context<AlertEvent> ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable<AlertEvent> previousEvents
> = ctx.getEventsForPattern("alertOne");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }
> })
> .next("alertThree").subtype(AlertEvent.class)
> .where(new IterativeCondition<SubEvent>() {
> @Override
> public boolean filter(AlertEvent subEvent,
> Context<AlertEvent> ctx) throws Exception {
> double surprisePercent = subEvent.surprise();
> Iterable<AlertEvent> previousEvents =
> ctx.getEventsForPattern("alertTwo");
> for (AlertEvent prevEvent : previousEvents) {
> double prevSurprisePercent = prevEvent.surprise();
> if (prevSurprisePercent > surprisePercent) {
> return false;
> }
> }
> return true;
> }});
> PatternStream<AlertEvent> alertPatternStream =
> CEP.pattern(alertEvents, pattern);
> DataStream<Alert> confirmedAlerts = alertPatternStream
> .select(new PatternSelectFunction<AlertEvent, Alert> {
> private static final long serialVersionUID = 1L;
> @Override
> public Alert select(Map<String, List<AlertEvent>> patternIn) {
> List<AlertEvent> alertEventList = new ArrayList<>();
> // Create an alert composed of three escalating events
> alertEventList.add(patternIn.get("alertOne").get(0));
> alertEventList.add(patternIn.get("alertTwo").get(0));
> alertEventList.add(patternIn.get("alertThree").get(0));
> Alert confirmedAlert = new Alert(alertEventList);
> return confirmedAlert;
> }
> })
> .uid("confirmedAlerts")
> .returns(Alert.class)
> .keyBy("id");
>
> Once again thank you,
> Austin
>
>
> - https://stackoverflow.com/questions/36917569/flink-streaming-cep-could-not-find-previous-shared-buffer-entry-with-key
> - https://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAC27z=ND0sX_OGBh0vuvZwEtCke2T40tBXHt5huH8uyZDTEqgg@mail.gmail.com%3E 
> -
> https://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAGr9p8CzT3=cr+cOas=3k0BeCMiybfv+kT1fXv_RF8xaBTudvQ@mail.gmail.com%3E 
>
> On Thu, Nov 1, 2018 at 4:44 AM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Hi Austin,
>
>     Could you elaborate a bit more what do you mean by "after a
>     checkpoint fails", what is the reason why checkpoint fails? Would
>     it be possible for you to prepare some reproducible example for
>     that problem? Finally, I would also recommend trying out Flink
>     1.6.x, as we reworked the underlying structure for CEP -
>     SharedBuffer.
>
>     Best,
>
>     Dawid
>
>     On 30/10/2018 20:59, Austin Cawley-Edwards wrote:
>>     Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.
>>
>>     Thanks,
>>     Austin
>>
>>     On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards
>>     <austin.cawley@gmail.com <ma...@gmail.com>> wrote:
>>
>>         Hi there,
>>
>>         We have a streaming application that uses CEP processing but
>>         are getting this error fairly frequently after a checkpoint
>>         fails, though not sure if it is related. We have implemented
>>         both `hashCode` and `equals()` using
>>         `Objects.hash(...properties)` and basic equality,
>>         respectively. Has anyone seen this before using CEP?
>>
>>         Here is the full exception:
>>
>>         java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
>>         	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>>         	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>         	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>         	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>>         	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>>         	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>>         	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>         	at java.lang.Thread.run(Thread.java:748)
>>         Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
>>         	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>>         	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>>         	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>>         	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>>         	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>>         	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>         	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>>         	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>>         	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>         	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>         	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>>         	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>>
>>         Best,
>>
>>         Austin
>>

Re: Flink CEP Watermark Exception

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Hi Dawid,

Thank you for your reply. I'm out for the next few days, so I hope you
don't mind me cc'ing my team in here. We all really appreciate you and the
rest of the people monitoring the mailing list.


We've only seen this SharedBuffer problem in production, after sending
around 20 GB of data through. In the Flink UI, we see the checkpoint status
as:


*Checkpoint failed: Checkpoint Coordinator is suspending.*

It then tries to restore the last previously succeeded checkpoint, but
cannot and throws the SharedBuffer exception. Our state size is around
200MB when it fails.

Unfortunately, the platform we are running our cluster on only supports up
to Flink 1.5. We will continue trying to find a reproducible example for
you. I have found a few other people with a similar problem (attached at
the bottom), but none seem to have been resolved.

Our CEP setup looks generally like this:
Pattern<AlertEvent, ?> pattern = Pattern.<AlertEvent>begin("alertOne")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(AlertEvent event) {
return event.level > 0;
}
})
.next("alertTwo").subtype(AlertEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(AlertEvent subEvent,
Context<AlertEvent> ctx) throws Exception {
double surprisePercent = subEvent.surprise();

Iterable<AlertEvent> previousEvents
= ctx.getEventsForPattern("alertOne");
for (AlertEvent prevEvent : previousEvents) {
double prevSurprisePercent = prevEvent.surprise();
if (prevSurprisePercent > surprisePercent) {
return false;
}
}
return true;
}
})
.next("alertThree").subtype(AlertEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(AlertEvent subEvent,
Context<AlertEvent> ctx) throws Exception {
double surprisePercent = subEvent.surprise();

Iterable<AlertEvent> previousEvents =
ctx.getEventsForPattern("alertTwo");
for (AlertEvent prevEvent : previousEvents) {
double prevSurprisePercent = prevEvent.surprise();
if (prevSurprisePercent > surprisePercent) {
return false;
}
}
return true;
}});

PatternStream<AlertEvent> alertPatternStream =
CEP.pattern(alertEvents, pattern);
DataStream<Alert> confirmedAlerts = alertPatternStream
.select(new PatternSelectFunction<AlertEvent, Alert> {
private static final long serialVersionUID = 1L;
@Override
public Alert select(Map<String, List<AlertEvent>> patternIn) {
List<AlertEvent> alertEventList = new ArrayList<>();
// Create an alert composed of three escalating events
alertEventList.add(patternIn.get("alertOne").get(0));
alertEventList.add(patternIn.get("alertTwo").get(0));
alertEventList.add(patternIn.get("alertThree").get(0));
Alert confirmedAlert = new Alert(alertEventList);
return confirmedAlert;
}
})
.uid("confirmedAlerts")
.returns(Alert.class)
.keyBy("id");

Once again thank you,
Austin


-
https://stackoverflow.com/questions/36917569/flink-streaming-cep-could-not-find-previous-shared-buffer-entry-with-key
-
https://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAC27z=ND0sX_OGBh0vuvZwEtCke2T40tBXHt5huH8uyZDTEqgg@mail.gmail.com%3E

-
https://mail-archives.apache.org/mod_mbox/flink-user/201702.mbox/%3CCAGr9p8CzT3=cr+cOas=3k0BeCMiybfv+kT1fXv_RF8xaBTudvQ@mail.gmail.com%3E


On Thu, Nov 1, 2018 at 4:44 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Austin,
>
> Could you elaborate a bit more what do you mean by "after a checkpoint
> fails", what is the reason why checkpoint fails? Would it be possible for
> you to prepare some reproducible example for that problem? Finally, I would
> also recommend trying out Flink 1.6.x, as we reworked the underlying
> structure for CEP - SharedBuffer.
>
> Best,
>
> Dawid
> On 30/10/2018 20:59, Austin Cawley-Edwards wrote:
>
> Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.
>
> Thanks,
> Austin
>
> On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <
> austin.cawley@gmail.com> wrote:
>
>> Hi there,
>>
>>  We have a streaming application that uses CEP processing but are getting this error fairly frequently after a checkpoint fails, though not sure if it is related. We have implemented both  `hashCode` and `equals()` using `Objects.hash(...properties)` and basic equality, respectively. Has anyone seen this before using CEP?
>>
>>  Here is  the full exception:
>>
>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
>> 	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>> 	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>> 	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> 	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>>
>>  Best,
>>
>> Austin
>>
>>

Re: Flink CEP Watermark Exception

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Austin,

Could you elaborate a bit more what do you mean by "after a checkpoint
fails", what is the reason why checkpoint fails? Would it be possible
for you to prepare some reproducible example for that problem? Finally,
I would also recommend trying out Flink 1.6.x, as we reworked the
underlying structure for CEP - SharedBuffer.

Best,

Dawid

On 30/10/2018 20:59, Austin Cawley-Edwards wrote:
> Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.
>
> Thanks,
> Austin
>
> On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards
> <austin.cawley@gmail.com <ma...@gmail.com>> wrote:
>
>     Hi there,
>
>     We have a streaming application that uses CEP processing but are
>     getting this error fairly frequently after a checkpoint fails,
>     though not sure if it is related. We have implemented both
>     `hashCode` and `equals()` using `Objects.hash(...properties)` and
>     basic equality, respectively. Has anyone seen this before using CEP?
>
>     Here is the full exception:
>
>     java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>     	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>     	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>     	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
>     	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>     	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>     	at java.lang.Thread.run(Thread.java:748)
>     Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
>     	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
>     	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
>     	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
>     	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
>     	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>     	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>     	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>
>     Best,
>
>     Austin
>

Re: Flink CEP Watermark Exception

Posted by Austin Cawley-Edwards <au...@gmail.com>.
Following up, we are using Flink 1.5.0 and Flink-CEP 2.11.

Thanks,
Austin

On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards <
austin.cawley@gmail.com> wrote:

> Hi there,
>
>
> We have a streaming application that uses CEP processing but are getting this error fairly frequently after a checkpoint fails, though not sure if it is related. We have implemented both  `hashCode` and `equals()` using `Objects.hash(...properties)` and basic equality, respectively. Has anyone seen this before using CEP?
>
>
> Here is  the full exception:
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Could not find previous entry with key: alertOne, value: {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} and timestamp: 1539700799999. This can indicate that either you did not implement the equals() and hashCode() methods of your input elements properly or that the element belonging to that entry has been already pruned.
> 	at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107)
> 	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566)
> 	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>
>
>
> Best,
>
> Austin
>
>