You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kostas Kloudas <k....@data-artisans.com> on 2018/02/06 17:03:53 UTC

Re: Flink CEP exception during RocksDB update

Hi Varun,

The branch I previously sent you has been now merged to the master.
So could you try the master and tell us if you see any change in the behavior? 
Has the problem been fixed, or has the message of the exception changed?

Thanks, 
Kostas

> On Jan 29, 2018, at 10:09 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi again Varun,
> 
> I am investigating the problem you mentioned and I found a bug in the SharedBuffer, 
> but I am not sure if it is the only bug that affects you.
> 
> Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv <https://github.com/kl0u/flink/tree/cep-inv> and let me know
> if the problem is still there?
> 
> In addition, are you using Scala with case classes or Java?
> 
> Thanks for helping fix the problem,
> Kostas
> 
>> On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
>> Hi Varun,
>> 
>> Thanks for taking time to look into it. Could you give a sample input and your pattern to reproduce the problem?
>> That would help a lot at figuring out the cause of the problem.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhore21@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Kostas,
>>> 
>>> I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. 
>>> Thanks in advance for your assistance.
>>> 
>>> Regards,
>>> Varun 
>>> 
>>> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>> 
>>>> Thanks a lot Varun!
>>>> 
>>>> Kostas
>>>> 
>>>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhore21@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.
>>>>> 
>>>>> Thanks,
>>>>> Varun 
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>>>>> 
>>>>>> Hi Varun,
>>>>>> 
>>>>>> This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226 <https://issues.apache.org/jira/browse/FLINK-8226>
>>>>>> which is currently fixed on the master.
>>>>>> 
>>>>>> Could you please try the current master to see if the error persists?
>>>>>> 
>>>>>> Thanks,
>>>>>> Kostas
>>>>>> 
>>>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhore21@gmail.com <ma...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> Hello Flink community,
>>>>>>>>  
>>>>>>>> I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
>>>>>>>>  
>>>>>>>>  
>>>>>>>>             java.lang.RuntimeException: Exception occurred while processing valve output watermark:
>>>>>>>>                         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>>>>>>>>                         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>>>>>>>>                         at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>>>>>>>>                         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>>>>>>>>                         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>>>>>>                         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>>>>>>>                         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>>>>>>                         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>             Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
>>>>>>>>                         at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
>>>>>>>>                         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
>>>>>>>>                         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
>>>>>>>>                         at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>>>>>>>>                         at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>>>>>>>>                         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>>>>>>>>                         at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>>>>>>>>                         ... 7 more
>>>>>>>>             Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
>>>>>>>>                         at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>>>>>>>                         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
>>>>>>>>                         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
>>>>>>>>                         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
>>>>>>>>                         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
>>>>>>>>                         at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
>>>>>>>>                         ... 13 more
>>>>>>>>  
>>>>>>>>  
>>>>>>>> Thanks,
>>>>>>>> Varun
>>>>>> 
>>>> 
>> 
>