You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Thomas Wozniakowski (JIRA)" <ji...@apache.org> on 2018/12/12 11:24:00 UTC

[jira] [Comment Edited] (FLINK-10960) CEP: Job Failure when .times(2) is used

    [ https://issues.apache.org/jira/browse/FLINK-10960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718816#comment-16718816 ] 

Thomas Wozniakowski edited comment on FLINK-10960 at 12/12/18 11:23 AM:
------------------------------------------------------------------------

Hi [~dawidwys],

I'm hoping you might be able to point me in the right direction, we just had this same error in production with different parameters. Below is the error I got:


{code:java}
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputStreamStatus(StatusWatermarkValve.java:152)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: State purchaseSequence:4 does not exist in the NFA. NFA has states [Normal State purchaseSequence [
	StateTransition(TAKE, from purchaseSequenceto $endState$, with condition),
	StateTransition(IGNORE, from purchaseSequenceto purchaseSequence, with condition),
]), Final State $endState$ [
]), Normal State purchaseSequence:2 [
	StateTransition(TAKE, from purchaseSequence:2to purchaseSequence:1, with condition),
	StateTransition(IGNORE, from purchaseSequence:2to purchaseSequence:2, with condition),
]), Start State purchaseSequence:3 [
	StateTransition(TAKE, from purchaseSequence:3to purchaseSequence:2, with condition),
]), Normal State purchaseSequence:0 [
	StateTransition(TAKE, from purchaseSequence:0to purchaseSequence, with condition),
	StateTransition(IGNORE, from purchaseSequence:0to purchaseSequence:0, with condition),
]), Normal State purchaseSequence:1 [
	StateTransition(TAKE, from purchaseSequence:1to purchaseSequence:0, with condition),
	StateTransition(IGNORE, from purchaseSequence:1to purchaseSequence:1, with condition),
])]
	at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:144)
	at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:270)
	at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:244)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.advanceTime(AbstractKeyedCEPPatternOperator.java:389)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:293)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
{code}

I think this is something to do with savepoint restores. In this case we were making a config change that stopped the job with a savepoint, then started it again with slightly different parameters. One of these changed {{.times(8)}} to {{.times(5)}} on one of our CEP operators.

Our automated build process has E2E tests for this exact case, so I don't think it's a limitation in Flink. I'm a bit at a loss here to work out what the problem is. We got the job running by deleting the savepoint and starting the job from scratch.

What does that stacktrace suggest to you? I'm running lots of local tests with variants of 

1. Stopping job with savepoint
2. Changing config (driving changes in CEP.Pattern()) operators
3. Restarting the job from the savepoint

but I haven't managed to recreate the error locally...


was (Author: jamalarm):
Hi [~dawidwys],

I'm hoping you might be able to point me in the right direction, we just had this same error in production with different parameters. Below is the error I got:


{code:text}
java.lang.RuntimeException: Exception occurred while processing valve output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputStreamStatus(StatusWatermarkValve.java:152)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: State purchaseSequence:4 does not exist in the NFA. NFA has states [Normal State purchaseSequence [
	StateTransition(TAKE, from purchaseSequenceto $endState$, with condition),
	StateTransition(IGNORE, from purchaseSequenceto purchaseSequence, with condition),
]), Final State $endState$ [
]), Normal State purchaseSequence:2 [
	StateTransition(TAKE, from purchaseSequence:2to purchaseSequence:1, with condition),
	StateTransition(IGNORE, from purchaseSequence:2to purchaseSequence:2, with condition),
]), Start State purchaseSequence:3 [
	StateTransition(TAKE, from purchaseSequence:3to purchaseSequence:2, with condition),
]), Normal State purchaseSequence:0 [
	StateTransition(TAKE, from purchaseSequence:0to purchaseSequence, with condition),
	StateTransition(IGNORE, from purchaseSequence:0to purchaseSequence:0, with condition),
]), Normal State purchaseSequence:1 [
	StateTransition(TAKE, from purchaseSequence:1to purchaseSequence:0, with condition),
	StateTransition(IGNORE, from purchaseSequence:1to purchaseSequence:1, with condition),
])]
	at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:144)
	at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:270)
	at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:244)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.advanceTime(AbstractKeyedCEPPatternOperator.java:389)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:293)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
{code}

I think this is something to do with savepoint restores. In this case we were making a config change that stopped the job with a savepoint, then started it again with slightly different parameters. One of these changed {{.times(8)}} to {{.times(5)}} on one of our CEP operators.

Our automated build process has E2E tests for this exact case, so I don't think it's a limitation in Flink. I'm a bit at a loss here to work out what the problem is. We got the job running by deleting the savepoint and starting the job from scratch.

What does that stacktrace suggest to you? I'm running lots of local tests with variants of 

1. Stopping job with savepoint
2. Changing config (driving changes in CEP.Pattern()) operators
3. Restarting the job from the savepoint

but I haven't managed to recreate the error locally...

> CEP: Job Failure when .times(2) is used
> ---------------------------------------
>
>                 Key: FLINK-10960
>                 URL: https://issues.apache.org/jira/browse/FLINK-10960
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.6.2
>            Reporter: Thomas Wozniakowski
>            Priority: Major
>
> Hi Guys,
> Encountered a strange one today. We use the CEP library in a configurable way where we plug a config file into the Flink Job JAR and it programmatically sets up a bunch of CEP operators matching the config file.
> I encountered a strange bug when I was testing with some artificially low numbers in our testing environment today. The CEP code we're using (modified slightly) is:
> {code:java}
> Pattern.begin(EVENT_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
>             .times(config.getNumberOfUniqueEvents())
>             .where(uniquenessCheckOnAlreadyMatchedEvents())
>             .within(seconds(config.getWithinSeconds()));
> {code}
> When using the {{numberOfUniqueEvents: 2}}, I started seeing the following error killing the job whenever a match was detected:
> {quote}
> ava.lang.RuntimeException: Exception occurred while processing valve output watermark: 
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: State eventSequence:2 does not exist in the NFA. NFA has states [Final State $endState$ [
> ]), Normal State eventSequence [
> 	StateTransition(TAKE, from eventSequenceto $endState$, with condition),
> 	StateTransition(IGNORE, from eventSequenceto eventSequence, with condition),
> ]), Start State eventSequence:0 [
> 	StateTransition(TAKE, from eventSequence:0to eventSequence, with condition),
> ])]
> 	at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:144)
> 	at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:270)
> 	at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:244)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.advanceTime(AbstractKeyedCEPPatternOperator.java:389)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:293)
> 	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> {quote}
> Changing the config to {{numberOfUniqueEvents: 3}} fixed the problem. Changing it back to 2 brought the problem back. It seems to be specifically related to the value of 2.
> This is not a blocking issue for me because we typically use much higher numbers than this in production anyway, but I figured you guys might want to know about this issue.
> Let me know if you need any more information.
> Tom



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)