You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "daiqing (JIRA)" <ji...@apache.org> on 2017/08/12 14:25:00 UTC

[jira] [Created] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

daiqing created FLINK-7435:
------------------------------

             Summary: FsStateBackend with incremental backup enable does not work with Keyed CEP
                 Key: FLINK-7435
                 URL: https://issues.apache.org/jira/browse/FLINK-7435
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.2, 1.3.1
         Environment: AWS EMR YARN, use CEP with pattern start -> next (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend with Incremental option open. 
            Reporter: daiqing


java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not copy NFA.
	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
	at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
	... 7 more
Caused by: java.io.StreamCorruptedException: invalid type code: 00
	at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
	at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
	at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
	... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)