You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shailesh Jain <sh...@stellapps.com> on 2018/10/25 03:41:39 UTC

Re: FlinkCEP, circular references and checkpointing failures

Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
the only commit on top of 1.6 is this:
https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled),
I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only when
checkpointing (HDFS backend) is enabled*, with the below stack trace.

I did see a similar problem with different operators here (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO
org.apache.flink.runtime.taskmanager.Task                     -
SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
(3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.lang.ArrayIndexOutOfBoundsException: -1
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at
org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <sh...@stellapps.com>
wrote:

> Hi Dawid,
>
> Thanks for your time on this. The diff should have pointed out only the
> top 3 commits, but since it did not, it is possible I did not rebase my
> branch against 1.4.2 correctly. I'll check this out and get back to you if
> I hit the same issue again.
>
> Thanks again,
> Shailesh
>
> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Shailesh,
>>
>> I am afraid it is gonna be hard to help you, as this branch differs
>> significantly from 1.4.2 release (I've done diff across your branch and
>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>> does not correspond to the lines in the exception you've posted previously.
>> Could you check if the problem occurs on vanilla flink as well?
>>
>> Best,
>>
>> Dawid
>>
>> On 27/09/18 08:22, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>> couple of changes in the CEP operator specifically (top 3 commits here:
>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>> I've made to CEP operators do not touch the checkpointing path, just
>> overloading the operator for a specific way of handling event time.
>>
>> We are hitting this in production, so I'm not sure it'll be feasible to
>> move to 1.6.0 immediately, but eventually yes.
>>
>> Thanks,
>> Shailesh
>>
>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
>>> have you introduced some changes? I am asking cause the lines in stacktrace
>>> does not align with the source code for 1.4.2.
>>>
>>> Also it is a different exception than the one in the issue you've
>>> linked, so if it is a problem than it is definitely a different one. Last
>>> thing I would recommend upgrading to the newest version, as we rewritten
>>> the SharedBuffer implementation in 1.6.0.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>
>>> Hi,
>>>
>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>> using HDFS (2.8.4) as state backend.
>>>
>>> 2018-09-26 17:07:39,370 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>> to fail task externally SelectCepOperator (1/1)
>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>> 2018-09-26 17:07:39,370 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     -
>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>     at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>> operator SelectCepOperator (1/1).
>>>     ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.NullPointerException
>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>     at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>     ... 5 more
>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>> keyed state future.
>>>         at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>         ... 5 more
>>>     Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.NullPointerException
>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>         at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>         at
>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>         at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>         ... 7 more
>>>     Caused by: java.lang.NullPointerException
>>>         at
>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>         at
>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>         at
>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>         ... 5 more
>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>
>>> Any ideas on why I'm hitting this especially when this (
>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>> fixed in 1.4.2 ?
>>>
>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws> wrote:
>>>
>>>> Thank you very much for your steady response, Kostas!
>>>>
>>>> Cheers,
>>>> Federico
>>>>
>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi Federico,
>>>>>
>>>>> Thanks for trying it out!
>>>>> Great to hear that your problem was fixed!
>>>>>
>>>>> The feature freeze for the release is going to be next week, and I
>>>>> would expect 1 or 2 more weeks testing.
>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>> potential issues we may find during testing.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>> Hi Kostas,
>>>>>
>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and
>>>>> it didn't crash, so that was the same underlying issue of the JIRA you
>>>>> linked.
>>>>>
>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>
>>>>> Thank you very much,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Perfect! thanks a lot!
>>>>>>
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>> to you.
>>>>>>
>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Hi Federico,
>>>>>>>
>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>
>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>> case:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>
>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>>  Could not find id for
>>>>>>> entry:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>
>>>
>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Bump.

On Thu, Oct 25, 2018 at 9:11 AM Shailesh Jain <sh...@stellapps.com>
wrote:

> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
> shailesh.jain@stellapps.com> wrote:
>
>> Hi Dawid,
>>
>> Thanks for your time on this. The diff should have pointed out only the
>> top 3 commits, but since it did not, it is possible I did not rebase my
>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>> I hit the same issue again.
>>
>> Thanks again,
>> Shailesh
>>
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> I am afraid it is gonna be hard to help you, as this branch differs
>>> significantly from 1.4.2 release (I've done diff across your branch and
>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>> does not correspond to the lines in the exception you've posted previously.
>>> Could you check if the problem occurs on vanilla flink as well?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>>> couple of changes in the CEP operator specifically (top 3 commits here:
>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>> I've made to CEP operators do not touch the checkpointing path, just
>>> overloading the operator for a specific way of handling event time.
>>>
>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>> move to 1.6.0 immediately, but eventually yes.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>> or have you introduced some changes? I am asking cause the lines in
>>>> stacktrace does not align with the source code for 1.4.2.
>>>>
>>>> Also it is a different exception than the one in the issue you've
>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>> the SharedBuffer implementation in 1.6.0.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>
>>>> Hi,
>>>>
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>>> using HDFS (2.8.4) as state backend.
>>>>
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>> to fail task externally SelectCepOperator (1/1)
>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>> RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>> operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>> keyed state future.
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at
>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at
>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>
>>>> Any ideas on why I'm hitting this especially when this (
>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>> fixed in 1.4.2 ?
>>>>
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>>> Thank you very much for your steady response, Kostas!
>>>>>
>>>>> Cheers,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Federico,
>>>>>>
>>>>>> Thanks for trying it out!
>>>>>> Great to hear that your problem was fixed!
>>>>>>
>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>> would expect 1 or 2 more weeks testing.
>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>> potential issues we may find during testing.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>> linked.
>>>>>>
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Perfect! thanks a lot!
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>> to you.
>>>>>>>
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>
>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>>> case:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>
>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>>  Could not find id for
>>>>>>>> entry:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>
>>>>
>>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Thank you, Stefan. Any ideas on when can we expect 1.6.3 release?

On Thu, Nov 8, 2018 at 4:28 PM Stefan Richter <s....@data-artisans.com>
wrote:

> Sure, it is already merged as FLINK-10816.
>
> Best,
> Stefan
>
> On 8. Nov 2018, at 11:53, Shailesh Jain <sh...@stellapps.com>
> wrote:
>
> Thanks a lot for looking into this issue Stefan.
>
> Could you please let me know the issue ID once you open it? It'll help me
> understand the problem better, and also I could do a quick test in our
> environment once the issue is resolved.
>
> Thanks,
> Shailesh
>
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org wrote:
>
>> Really good finding Stefan!
>>
>> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I think I can already spot the
>>> problem: LockableTypeSerializer.duplicate() is not properly implemented
>>> because it also has to call duplicate() on the element serialiser that is
>>> passed into the constructor of the new instance. I will open an issue and
>>> fix the problem.
>>>
>>> Best,
>>> Stefan
>>>
>>> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> Hi Shailesh,
>>>
>>> could you maybe provide us with an example program which is able to
>>> reproduce this problem? This would help the community to better debug the
>>> problem. It looks not right and might point towards a bug in Flink. Thanks
>>> a lot!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> This is some problem with serializing your events using Kryo. I'm
>>>> adding Gordon to cc, as he was recently working with serializers. He might
>>>> give you more insights what is going wrong.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>>>
>>>> Hi Dawid,
>>>>
>>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>>>> 1.6.1, the only commit on top of 1.6 is this:
>>>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>>>
>>>> I ran two separate identical jobs (with and without checkpointing
>>>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>>>> when checkpointing (HDFS backend) is enabled*, with the below stack
>>>> trace.
>>>>
>>>> I did see a similar problem with different operators here (
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>>>> Is this a known issue which is getting addressed?
>>>>
>>>> Any ideas on what could be causing this?
>>>>
>>>> Thanks,
>>>> Shailesh
>>>>
>>>>
>>>> 2018-10-24 17:04:13,365 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>>>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>>>> function.
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>>         at
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>>         at
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>>         at
>>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>>         at
>>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>>         ... 10 more
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>>         at
>>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>>         ... 18 more
>>>>
>>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>>>> shailesh.jain@stellapps.com> wrote:
>>>>
>>>>> Hi Dawid,
>>>>>
>>>>> Thanks for your time on this. The diff should have pointed out only
>>>>> the top 3 commits, but since it did not, it is possible I did not rebase my
>>>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>>>> I hit the same issue again.
>>>>>
>>>>> Thanks again,
>>>>> Shailesh
>>>>>
>>>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <
>>>>> dwysakowicz@apache.org> wrote:
>>>>>
>>>>>> Hi Shailesh,
>>>>>>
>>>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>>>> does not correspond to the lines in the exception you've posted previously.
>>>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>>>
>>>>>> Hi Dawid,
>>>>>>
>>>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have
>>>>>> added a couple of changes in the CEP operator specifically (top 3 commits
>>>>>> here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
>>>>>> Changes I've made to CEP operators do not touch the checkpointing path,
>>>>>> just overloading the operator for a specific way of handling event time.
>>>>>>
>>>>>> We are hitting this in production, so I'm not sure it'll be feasible
>>>>>> to move to 1.6.0 immediately, but eventually yes.
>>>>>>
>>>>>> Thanks,
>>>>>> Shailesh
>>>>>>
>>>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>
>>>>>>> Hi Shailesh,
>>>>>>>
>>>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla
>>>>>>> flink, or have you introduced some changes? I am asking cause the lines in
>>>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>>>
>>>>>>> Also it is a different exception than the one in the issue you've
>>>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dawid
>>>>>>>
>>>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>>>
>>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>>>> to fail task externally SelectCepOperator (1/1)
>>>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>>>> RUNNING to FAILED.
>>>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>>>     at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>>>     at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>     at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6
>>>>>>> for operator SelectCepOperator (1/1).
>>>>>>>     ... 6 more
>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.NullPointerException
>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>     at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>>     at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>>     ... 5 more
>>>>>>>     Suppressed: java.lang.Exception: Could not properly cancel
>>>>>>> managed keyed state future.
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>>>         ... 5 more
>>>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.NullPointerException
>>>>>>>         at
>>>>>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>         at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>>>         ... 7 more
>>>>>>>     Caused by: java.lang.NullPointerException
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>>         ... 5 more
>>>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>>>
>>>>>>> Any ideas on why I'm hitting this especially when this (
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>>>> fixed in 1.4.2 ?
>>>>>>>
>>>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Federico
>>>>>>>>
>>>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi Federico,
>>>>>>>>>
>>>>>>>>> Thanks for trying it out!
>>>>>>>>> Great to hear that your problem was fixed!
>>>>>>>>>
>>>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>>>> potential issues we may find during testing.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>> Hi Kostas,
>>>>>>>>>
>>>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>>>> linked.
>>>>>>>>>
>>>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>>>
>>>>>>>>> Thank you very much,
>>>>>>>>> Federico
>>>>>>>>>
>>>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Perfect! thanks a lot!
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Kostas,
>>>>>>>>>>
>>>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get
>>>>>>>>>> back to you.
>>>>>>>>>>
>>>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Federico,
>>>>>>>>>>>
>>>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>>>
>>>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>>>> your case:
>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>>>
>>>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>>
>>>>>>>>>>>  Could not find id for
>>>>>>>>>>> entry:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Stefan Richter <s....@data-artisans.com>.
Sure, it is already merged as FLINK-10816.

Best,
Stefan

> On 8. Nov 2018, at 11:53, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Thanks a lot for looking into this issue Stefan.
> 
> Could you please let me know the issue ID once you open it? It'll help me understand the problem better, and also I could do a quick test in our environment once the issue is resolved.
> 
> Thanks,
> Shailesh
> 
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org <ma...@apache.org> wrote:
> Really good finding Stefan!
> 
> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.
> 
> Best,
> Stefan
> 
>> On 7. Nov 2018, at 17:17, Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Shailesh,
>> 
>> could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>> This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>> Hi Dawid,
>>> 
>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c <https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c>
>>> 
>>> I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.
>>> 
>>> I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html>). Is this a known issue which is getting addressed?
>>> 
>>> Any ideas on what could be causing this?
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> 
>>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>         at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>         ... 10 more
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>         at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>         at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>         at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>         ... 18 more
>>> 
>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>>> Hi Dawid,
>>> 
>>> Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.
>>> 
>>> Thanks again,
>>> Shailesh
>>> 
>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>> Hi Shailesh,
>>> 
>>> I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> 
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>> Hi Dawid,
>>>> 
>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2 <https://github.com/jainshailesh/flink/commits/poc_on_1.4.2>). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.
>>>> 
>>>> We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.
>>>> 
>>>> Thanks,
>>>> Shailesh
>>>> 
>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>>> Hi Shailesh,
>>>> 
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.
>>>> 
>>>> Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.
>>>> 
>>>> Best,
>>>> 
>>>> Dawid
>>>> 
>>>> 
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>> Hi,
>>>>> 
>>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>>> 
>>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
>>>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
>>>>>     ... 6 more
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>     ... 5 more
>>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>         ... 5 more
>>>>>     Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>         at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>         ... 7 more
>>>>>     Caused by: java.lang.NullPointerException
>>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>         at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>         ... 5 more
>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>> 
>>>>> Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>) says it has been fixed in 1.4.2 ?
>>>>> 
>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>> Thank you very much for your steady response, Kostas!
>>>>> 
>>>>> Cheers,
>>>>> Federico
>>>>> 
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Hi Federico,
>>>>> 
>>>>> Thanks for trying it out! 
>>>>> Great to hear that your problem was fixed!
>>>>> 
>>>>> The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
>>>>> So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.
>>>>> 
>>>>> Cheers,
>>>>> Kostas
>>>>> 
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>> 
>>>>>> Hi Kostas,
>>>>>> 
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.
>>>>>> 
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>> 
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>> 
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Perfect! thanks a lot!
>>>>>> 
>>>>>> Kostas
>>>>>> 
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>> 
>>>>>>> Hi Kostas, 
>>>>>>> 
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>>>>>>> 
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>>> Hi Federico,
>>>>>>> 
>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>> 
>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
>>>>>>> 
>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Kostas
>>>>>>> 
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>>> 
>>>>>>>>  Could not find id for entry:                                                        
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -- 
>>>>>>> Federico D'Ambrosio
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Federico D'Ambrosio
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Federico D'Ambrosio
>>>> 
>>> 
> 


Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Thanks a lot for looking into this issue Stefan.

Could you please let me know the issue ID once you open it? It'll help me
understand the problem better, and also I could do a quick test in our
environment once the issue is resolved.

Thanks,
Shailesh

On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org wrote:

> Really good finding Stefan!
>
> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s....@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> I think I can already spot the
>> problem: LockableTypeSerializer.duplicate() is not properly implemented
>> because it also has to call duplicate() on the element serialiser that is
>> passed into the constructor of the new instance. I will open an issue and
>> fix the problem.
>>
>> Best,
>> Stefan
>>
>> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi Shailesh,
>>
>> could you maybe provide us with an example program which is able to
>> reproduce this problem? This would help the community to better debug the
>> problem. It looks not right and might point towards a bug in Flink. Thanks
>> a lot!
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> This is some problem with serializing your events using Kryo. I'm adding
>>> Gordon to cc, as he was recently working with serializers. He might give
>>> you more insights what is going wrong.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>>> 1.6.1, the only commit on top of 1.6 is this:
>>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>>
>>> I ran two separate identical jobs (with and without checkpointing
>>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>>> when checkpointing (HDFS backend) is enabled*, with the below stack
>>> trace.
>>>
>>> I did see a similar problem with different operators here (
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>>> Is this a known issue which is getting addressed?
>>>
>>> Any ideas on what could be causing this?
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> 2018-10-24 17:04:13,365 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     -
>>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>>> function.
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>         at
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>         at
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>         at
>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>         at
>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>         at
>>> org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>         ... 10 more
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>         at
>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>         at
>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>         at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>         ... 18 more
>>>
>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>>> shailesh.jain@stellapps.com> wrote:
>>>
>>>> Hi Dawid,
>>>>
>>>> Thanks for your time on this. The diff should have pointed out only the
>>>> top 3 commits, but since it did not, it is possible I did not rebase my
>>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>>> I hit the same issue again.
>>>>
>>>> Thanks again,
>>>> Shailesh
>>>>
>>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <
>>>> dwysakowicz@apache.org> wrote:
>>>>
>>>>> Hi Shailesh,
>>>>>
>>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>>> does not correspond to the lines in the exception you've posted previously.
>>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>>
>>>>> Hi Dawid,
>>>>>
>>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added
>>>>> a couple of changes in the CEP operator specifically (top 3 commits here:
>>>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>>>> I've made to CEP operators do not touch the checkpointing path, just
>>>>> overloading the operator for a specific way of handling event time.
>>>>>
>>>>> We are hitting this in production, so I'm not sure it'll be feasible
>>>>> to move to 1.6.0 immediately, but eventually yes.
>>>>>
>>>>> Thanks,
>>>>> Shailesh
>>>>>
>>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>>> dwysakowicz@apache.org> wrote:
>>>>>
>>>>>> Hi Shailesh,
>>>>>>
>>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>>>> or have you introduced some changes? I am asking cause the lines in
>>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>>
>>>>>> Also it is a different exception than the one in the issue you've
>>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>>
>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>>> to fail task externally SelectCepOperator (1/1)
>>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>>> RUNNING to FAILED.
>>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>>     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6
>>>>>> for operator SelectCepOperator (1/1).
>>>>>>     ... 6 more
>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.NullPointerException
>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>     at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>     ... 5 more
>>>>>>     Suppressed: java.lang.Exception: Could not properly cancel
>>>>>> managed keyed state future.
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>>         ... 5 more
>>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.NullPointerException
>>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>         at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>>         ... 7 more
>>>>>>     Caused by: java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>>         at
>>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>         ... 5 more
>>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>>
>>>>>> Any ideas on why I'm hitting this especially when this (
>>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>>> fixed in 1.4.2 ?
>>>>>>
>>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Federico
>>>>>>>
>>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> Thanks for trying it out!
>>>>>>>> Great to hear that your problem was fixed!
>>>>>>>>
>>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>>> potential issues we may find during testing.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>> Hi Kostas,
>>>>>>>>
>>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>>> linked.
>>>>>>>>
>>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>>
>>>>>>>> Thank you very much,
>>>>>>>> Federico
>>>>>>>>
>>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Perfect! thanks a lot!
>>>>>>>>>
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>> Hi Kostas,
>>>>>>>>>
>>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get
>>>>>>>>> back to you.
>>>>>>>>>
>>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Hi Federico,
>>>>>>>>>>
>>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>>
>>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>>> your case:
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>>
>>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>
>>>>>>>>>>  Could not find id for
>>>>>>>>>> entry:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Till Rohrmann <tr...@apache.org>.
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s....@data-artisans.com>
wrote:

> Hi,
>
> I think I can already spot the problem: LockableTypeSerializer.duplicate()
> is not properly implemented because it also has to call duplicate() on the
> element serialiser that is passed into the constructor of the new instance.
> I will open an issue and fix the problem.
>
> Best,
> Stefan
>
> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Shailesh,
>
> could you maybe provide us with an example program which is able to
> reproduce this problem? This would help the community to better debug the
> problem. It looks not right and might point towards a bug in Flink. Thanks
> a lot!
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> This is some problem with serializing your events using Kryo. I'm adding
>> Gordon to cc, as he was recently working with serializers. He might give
>> you more insights what is going wrong.
>>
>> Best,
>>
>> Dawid
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>> 1.6.1, the only commit on top of 1.6 is this:
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>
>> I ran two separate identical jobs (with and without checkpointing
>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>> when checkpointing (HDFS backend) is enabled*, with the below stack
>> trace.
>>
>> I did see a similar problem with different operators here (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>> Is this a known issue which is getting addressed?
>>
>> Any ideas on what could be causing this?
>>
>> Thanks,
>> Shailesh
>>
>>
>> 2018-10-24 17:04:13,365 INFO
>> org.apache.flink.runtime.taskmanager.Task                     -
>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>> function.
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>         at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>         at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>> java.lang.ArrayIndexOutOfBoundsException: -1
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>         at
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>         at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>         at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>         ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>         at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>         at
>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>         at
>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>         at
>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>         at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>         ... 18 more
>>
>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>> shailesh.jain@stellapps.com> wrote:
>>
>>> Hi Dawid,
>>>
>>> Thanks for your time on this. The diff should have pointed out only the
>>> top 3 commits, but since it did not, it is possible I did not rebase my
>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>> I hit the same issue again.
>>>
>>> Thanks again,
>>> Shailesh
>>>
>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>> does not correspond to the lines in the exception you've posted previously.
>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>
>>>> Hi Dawid,
>>>>
>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added
>>>> a couple of changes in the CEP operator specifically (top 3 commits here:
>>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>>> I've made to CEP operators do not touch the checkpointing path, just
>>>> overloading the operator for a specific way of handling event time.
>>>>
>>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>>> move to 1.6.0 immediately, but eventually yes.
>>>>
>>>> Thanks,
>>>> Shailesh
>>>>
>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>> dwysakowicz@apache.org> wrote:
>>>>
>>>>> Hi Shailesh,
>>>>>
>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>>> or have you introduced some changes? I am asking cause the lines in
>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>
>>>>> Also it is a different exception than the one in the issue you've
>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>
>>>>> 2018-09-26 17:07:39,370 INFO
>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>> to fail task externally SelectCepOperator (1/1)
>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>> 2018-09-26 17:07:39,370 INFO
>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>> RUNNING to FAILED.
>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>>> operator SelectCepOperator (1/1).
>>>>>     ... 6 more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.NullPointerException
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>     ... 5 more
>>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>>> keyed state future.
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>         ... 5 more
>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.NullPointerException
>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>         at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>         at
>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>         ... 7 more
>>>>>     Caused by: java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>         at
>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>         ... 5 more
>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>
>>>>> Any ideas on why I'm hitting this especially when this (
>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>> fixed in 1.4.2 ?
>>>>>
>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>
>>>>>> Cheers,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Hi Federico,
>>>>>>>
>>>>>>> Thanks for trying it out!
>>>>>>> Great to hear that your problem was fixed!
>>>>>>>
>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>> potential issues we may find during testing.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>> linked.
>>>>>>>
>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>
>>>>>>> Thank you very much,
>>>>>>> Federico
>>>>>>>
>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Perfect! thanks a lot!
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>> Hi Kostas,
>>>>>>>>
>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>>> to you.
>>>>>>>>
>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi Federico,
>>>>>>>>>
>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>
>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>> your case:
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>
>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>>  Could not find id for
>>>>>>>>> entry:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>
>>>>>
>>>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.

Best,
Stefan

> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Shailesh,
> 
> could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!
> 
> Cheers,
> Till
> 
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
> This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.
> 
> Best,
> 
> Dawid
> 
> On 25/10/2018 05:41, Shailesh Jain wrote:
>> Hi Dawid,
>> 
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c <https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c>
>> 
>> I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.
>> 
>> I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html>). Is this a known issue which is getting addressed?
>> 
>> Any ideas on what could be causing this?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>         at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>         ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>         at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>         at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>         at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>         ... 18 more
>> 
>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>> Hi Dawid,
>> 
>> Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.
>> 
>> Thanks again,
>> Shailesh
>> 
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>> Hi Shailesh,
>> 
>> I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?
>> 
>> Best,
>> 
>> Dawid
>> 
>> 
>> On 27/09/18 08:22, Shailesh Jain wrote:
>>> Hi Dawid,
>>> 
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2 <https://github.com/jainshailesh/flink/commits/poc_on_1.4.2>). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.
>>> 
>>> We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>> Hi Shailesh,
>>> 
>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.
>>> 
>>> Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> 
>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>> Hi,
>>>> 
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>> 
>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>> 
>>>> Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>) says it has been fixed in 1.4.2 ?
>>>> 
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>> Thank you very much for your steady response, Kostas!
>>>> 
>>>> Cheers,
>>>> Federico
>>>> 
>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi Federico,
>>>> 
>>>> Thanks for trying it out! 
>>>> Great to hear that your problem was fixed!
>>>> 
>>>> The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
>>>> So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>> 
>>>>> Hi Kostas,
>>>>> 
>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.
>>>>> 
>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>> 
>>>>> Thank you very much,
>>>>> Federico
>>>>> 
>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Perfect! thanks a lot!
>>>>> 
>>>>> Kostas
>>>>> 
>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>> 
>>>>>> Hi Kostas, 
>>>>>> 
>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>>>>>> 
>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Hi Federico,
>>>>>> 
>>>>>> I assume that you are using Flink 1.3, right?
>>>>>> 
>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>>>>> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
>>>>>> 
>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>> 
>>>>>> Thanks,
>>>>>> Kostas
>>>>>> 
>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>> 
>>>>>>>  Could not find id for entry:                                                        
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Federico D'Ambrosio
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Federico D'Ambrosio
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Federico D'Ambrosio
>>> 
>> 


Re: FlinkCEP, circular references and checkpointing failures

Posted by Till Rohrmann <tr...@apache.org>.
Hi Shailesh,

could you maybe provide us with an example program which is able to
reproduce this problem? This would help the community to better debug the
problem. It looks not right and might point towards a bug in Flink. Thanks
a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> This is some problem with serializing your events using Kryo. I'm adding
> Gordon to cc, as he was recently working with serializers. He might give
> you more insights what is going wrong.
>
> Best,
>
> Dawid
> On 25/10/2018 05:41, Shailesh Jain wrote:
>
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
> shailesh.jain@stellapps.com> wrote:
>
>> Hi Dawid,
>>
>> Thanks for your time on this. The diff should have pointed out only the
>> top 3 commits, but since it did not, it is possible I did not rebase my
>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>> I hit the same issue again.
>>
>> Thanks again,
>> Shailesh
>>
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> I am afraid it is gonna be hard to help you, as this branch differs
>>> significantly from 1.4.2 release (I've done diff across your branch and
>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>> does not correspond to the lines in the exception you've posted previously.
>>> Could you check if the problem occurs on vanilla flink as well?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>>> couple of changes in the CEP operator specifically (top 3 commits here:
>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>> I've made to CEP operators do not touch the checkpointing path, just
>>> overloading the operator for a specific way of handling event time.
>>>
>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>> move to 1.6.0 immediately, but eventually yes.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>> or have you introduced some changes? I am asking cause the lines in
>>>> stacktrace does not align with the source code for 1.4.2.
>>>>
>>>> Also it is a different exception than the one in the issue you've
>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>> the SharedBuffer implementation in 1.6.0.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>
>>>> Hi,
>>>>
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>>> using HDFS (2.8.4) as state backend.
>>>>
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>> to fail task externally SelectCepOperator (1/1)
>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>> RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>> operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>> keyed state future.
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at
>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at
>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>
>>>> Any ideas on why I'm hitting this especially when this (
>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>> fixed in 1.4.2 ?
>>>>
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>>> Thank you very much for your steady response, Kostas!
>>>>>
>>>>> Cheers,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Federico,
>>>>>>
>>>>>> Thanks for trying it out!
>>>>>> Great to hear that your problem was fixed!
>>>>>>
>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>> would expect 1 or 2 more weeks testing.
>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>> potential issues we may find during testing.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>> linked.
>>>>>>
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Perfect! thanks a lot!
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>> to you.
>>>>>>>
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>
>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>>> case:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>
>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>>  Could not find id for
>>>>>>>> entry:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>
>>>>
>>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Dawid Wysakowicz <dw...@apache.org>.
This is some problem with serializing your events using Kryo. I'm adding
Gordon to cc, as he was recently working with serializers. He might give
you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag
> 1.6.1, the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes
> NPE) *only when checkpointing (HDFS backend) is enabled*, with the
> below stack trace.
>
> I did see a similar problem with different operators here
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO 
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in
> filter function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain
> <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>
>     Hi Dawid,
>
>     Thanks for your time on this. The diff should have pointed out
>     only the top 3 commits, but since it did not, it is possible I did
>     not rebase my branch against 1.4.2 correctly. I'll check this out
>     and get back to you if I hit the same issue again.
>
>     Thanks again,
>     Shailesh
>
>     On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz
>     <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>         Hi Shailesh,
>
>         I am afraid it is gonna be hard to help you, as this branch
>         differs significantly from 1.4.2 release (I've done diff
>         across your branch and tag/release-1.4.2). Moreover the code
>         in the branch you've provided still does not correspond to the
>         lines in the exception you've posted previously. Could you
>         check if the problem occurs on vanilla flink as well?
>
>         Best,
>
>         Dawid
>
>
>         On 27/09/18 08:22, Shailesh Jain wrote:
>>         Hi Dawid,
>>
>>         Yes, it is version 1.4.2. We are running vanilla flink, but
>>         have added a couple of changes in the CEP operator
>>         specifically (top 3 commits here:
>>         https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
>>         Changes I've made to CEP operators do not touch the
>>         checkpointing path, just overloading the operator for a
>>         specific way of handling event time.
>>
>>         We are hitting this in production, so I'm not sure it'll be
>>         feasible to move to 1.6.0 immediately, but eventually yes.
>>
>>         Thanks,
>>         Shailesh
>>
>>         On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz
>>         <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>
>>             Hi Shailesh,
>>
>>             Are you sure you are using version 1.4.2? Do you run a
>>             vanilla flink, or have you introduced some changes? I am
>>             asking cause the lines in stacktrace does not align with
>>             the source code for 1.4.2.
>>
>>             Also it is a different exception than the one in the
>>             issue you've linked, so if it is a problem than it is
>>             definitely a different one. Last thing I would recommend
>>             upgrading to the newest version, as we rewritten the
>>             SharedBuffer implementation in 1.6.0.
>>
>>             Best,
>>
>>             Dawid
>>
>>
>>             On 26/09/18 13:50, Shailesh Jain wrote:
>>>             Hi,
>>>
>>>             I think I've hit this same issue on a 3 node standalone
>>>             cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>
>>>             2018-09-26 17:07:39,370 INFO 
>>>             org.apache.flink.runtime.taskmanager.Task                    
>>>             - Attempting to fail task externally SelectCepOperator
>>>             (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>             2018-09-26 17:07:39,370 INFO 
>>>             org.apache.flink.runtime.taskmanager.Task                    
>>>             - SelectCepOperator (1/1)
>>>             (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING
>>>             to FAILED.
>>>             AsynchronousException{java.lang.Exception: Could not
>>>             materialize checkpoint 6 for operator SelectCepOperator
>>>             (1/1).}
>>>                 at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>                 at
>>>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>                 at
>>>             java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>                 at java.lang.Thread.run(Thread.java:748)
>>>             Caused by: java.lang.Exception: Could not materialize
>>>             checkpoint 6 for operator SelectCepOperator (1/1).
>>>                 ... 6 more
>>>             Caused by: java.util.concurrent.ExecutionException:
>>>             java.lang.NullPointerException
>>>                 at
>>>             java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>                 at
>>>             java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>                 at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>                 at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>                 ... 5 more
>>>                 Suppressed: java.lang.Exception: Could not properly
>>>             cancel managed keyed state future.
>>>                     at
>>>             org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>                     ... 5 more
>>>                 Caused by: java.util.concurrent.ExecutionException:
>>>             java.lang.NullPointerException
>>>                     at
>>>             java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>                     at
>>>             java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>                     at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>                     at
>>>             org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>                     at
>>>             org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>                     ... 7 more
>>>                 Caused by: java.lang.NullPointerException
>>>                     at
>>>             org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>                     at
>>>             org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>                     at
>>>             org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>                     at
>>>             org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>                     at
>>>             org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>                     at
>>>             java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>                     at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>                     ... 5 more
>>>                 [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>
>>>             Any ideas on why I'm hitting this especially when this
>>>             (https://issues.apache.org/jira/browse/FLINK-7756) says
>>>             it has been fixed in 1.4.2 ?
>>>
>>>             On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio
>>>             <federico.dambrosio@smartlab.ws
>>>             <ma...@smartlab.ws>> wrote:
>>>
>>>                 Thank you very much for your steady response, Kostas!
>>>
>>>                 Cheers,
>>>                 Federico
>>>
>>>                 2017-11-03 16:26 GMT+01:00 Kostas Kloudas
>>>                 <k.kloudas@data-artisans.com
>>>                 <ma...@data-artisans.com>>:
>>>
>>>                     Hi Federico,
>>>
>>>                     Thanks for trying it out! 
>>>                     Great to hear that your problem was fixed!
>>>
>>>                     The feature freeze for the release is going to
>>>                     be next week, and I would expect 1 or 2 more
>>>                     weeks testing.
>>>                     So I would say in 2.5 weeks. But this is of
>>>                     course subject to potential issues we may find
>>>                     during testing.
>>>
>>>                     Cheers,
>>>                     Kostas
>>>
>>>>                     On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio
>>>>                     <federico.dambrosio@smartlab.ws
>>>>                     <ma...@smartlab.ws>> wrote:
>>>>
>>>>                     Hi Kostas,
>>>>
>>>>                     I just tried running the same job with
>>>>                     1.4-SNAPSHOT for 10 minutes and it didn't
>>>>                     crash, so that was the same underlying issue of
>>>>                     the JIRA you linked.
>>>>
>>>>                     Do you happen to know when it's expected the
>>>>                     1.4 stable release?
>>>>
>>>>                     Thank you very much,
>>>>                     Federico
>>>>
>>>>                     2017-11-03 15:25 GMT+01:00 Kostas Kloudas
>>>>                     <k.kloudas@data-artisans.com
>>>>                     <ma...@data-artisans.com>>:
>>>>
>>>>                         Perfect! thanks a lot!
>>>>
>>>>                         Kostas
>>>>
>>>>>                         On Nov 3, 2017, at 3:23 PM, Federico
>>>>>                         D'Ambrosio <federico.dambrosio@smartlab.ws
>>>>>                         <ma...@smartlab.ws>>
>>>>>                         wrote:
>>>>>
>>>>>                         Hi Kostas,
>>>>>
>>>>>                         yes, I'm using 1.3.2. I'll try the current
>>>>>                         master and I'll get back to you.
>>>>>
>>>>>                         2017-11-03 15:21 GMT+01:00 Kostas Kloudas
>>>>>                         <k.kloudas@data-artisans.com
>>>>>                         <ma...@data-artisans.com>>:
>>>>>
>>>>>                             Hi Federico,
>>>>>
>>>>>                             I assume that you are using Flink 1.3,
>>>>>                             right?
>>>>>
>>>>>                             In this case, in 1.4 we have fixed a
>>>>>                             bug that seems similar to your case:
>>>>>                             https://issues.apache.org/jira/browse/FLINK-7756
>>>>>
>>>>>                             Could you try the current master to
>>>>>                             see if it fixes your problem?
>>>>>
>>>>>                             Thanks,
>>>>>                             Kostas
>>>>>
>>>>>>                             On Nov 3, 2017, at 3:12 PM, Federico
>>>>>>                             D'Ambrosio
>>>>>>                             <federico.dambrosio@smartlab.ws
>>>>>>                             <ma...@smartlab.ws>>
>>>>>>                             wrote:
>>>>>>
>>>>>>                              Could not find id for
>>>>>>                             entry:                                                        
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                         -- 
>>>>>                         Federico D'Ambrosio
>>>>
>>>>
>>>>
>>>>
>>>>                     -- 
>>>>                     Federico D'Ambrosio
>>>
>>>
>>>
>>>
>>>                 -- 
>>>                 Federico D'Ambrosio
>>>
>>
>