You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Federico D'Ambrosio <fe...@smartlab.ws> on 2017/11/03 14:12:36 UTC

FlinkCEP, circular references and checkpointing failures

Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
it comes to checkpoints and within clauses windows closing at the same time
a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern

.begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
>= 37000)
    .notNext("disappearing").where(_.event.instantValues.altitude >=
37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)

which causes failure on the second checkpoint with the following exception
stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo                                                       r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500                                                       0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed                                                        state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalSta
teException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509                                                       716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129
,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge(null,
5)                                                       ,
SharedBufferEdge(null, 6)], 1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest
edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal
l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna
pshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperat
or.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin
tState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCh
eckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh
eckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe
ckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa
rrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo
nBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr
ocessInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r
un(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St
reamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id f                                                       or entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"
,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"
,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat
":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge
(null, 5), SharedBufferEdge(null, 6)], 1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo                                                       r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500                                                       0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed                                                        state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509                                                       716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null,
5)                                                       ,
SharedBufferEdge(null, 6)], 1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.
serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java
:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$Nest
edMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.p
erformOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.cal
l(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.sna
pshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperat
or.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$Checkpoin
tingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpoin
tState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCh
eckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCh
eckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyChe
ckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBa
rrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNo
nBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.pr
ocessInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.r
un(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(St
reamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id f                                                       or entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG"
,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593"
,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat
":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge
(null, 5), SharedBufferEdge(null, 6)], 1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
notific
ation-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi
alizeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato
rs(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S
treamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object
InputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre
am.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn
putStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162
1)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20
00)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initi
alizeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperato
rs(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(S
treamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(Object
InputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStre
am.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectIn
putStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:162
1)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:20
00)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1
801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.j
ava:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java
:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$State
TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.jav
a:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePart
itionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(Hea
pKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateB
ackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initK
eyedState(AbstractStreamOperator.java:311)
        ... 6 more


What is happening here? Am I doing something wrong? Is there some sort of
conflict between within clauses deadlines and checkpoint deadlines?

I found the following similar JIRA pages, but none of those mention
circular references: https://issues.apache.org/jira/browse/FLINK-6321
https://issues.apache.org/jira/browse/FLINK-7484
https://issues.apache.org/jira/browse/FLINK-7756

Kind Regards,
Federico D'Ambrosio

Re: FlinkCEP, circular references and checkpointing failures

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
I'm sorry, I realized that the stacktrack was poorly formatted, here it is
a better formatting:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operatorKeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)]

11/03/2017 13:46:46     Job execution switched to status FAILING.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
        ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
                ... 5 more
        Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
                at java.util.concurrent.FutureTask.get(FutureTask.java:192)
                at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
                at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
                at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
                ... 7 more
        Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
                at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
                at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
                at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
                at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
                at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
                at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:228)
                at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
                at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
                at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                ... 1 more
        [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not find
id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)]
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
11/03/2017 13:46:46     Process(1/1) switched to CANCELING
11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
11/03/2017 13:46:46     Process(1/1) switched to CANCELED
11/03/2017 13:46:46     Job execution switched to status RESTARTING.
11/03/2017 13:46:56     Job execution switched to status CREATED.
11/03/2017 13:46:56     Job execution switched to status RUNNING.
11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to SCHEDULED
11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to DEPLOYING
11/03/2017 13:46:56     Process(1/1) switched to RUNNING
11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to RUNNING
11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1(1/1) switched to FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

11/03/2017 13:46:57     Job execution switched to status FAILING.
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
        at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
        at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
        at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

2017-11-03 15:12 GMT+01:00 Federico D'Ambrosio <
federico.dambrosio@smartlab.ws>:

> Hello everyone,
>
> I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
> it comes to checkpoints and within clauses windows closing at the same time
> a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.
>
> The following is the relevant code:
>
> val env : StreamExecutionEnvironment = StreamExecutionEnvironment.
> getExecutionEnvironment
> env.enableCheckpointing(60000) //Checkpoints every minute
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))
>
> //Pattern
> val pattern =
>   Pattern
>     .begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
> >= 37000)
>     .notNext("disappearing").where(_.event.instantValues.altitude >=
> 37000).within(Time.minutes(1))
>
> // Associate KeyedStream with pattern to be detected
> val patternStream  = CEP.pattern(streamById, pattern)
>
> which causes failure on the second checkpoint with the following exception
> stack trace:
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 fo                                                       r
> operator KeyedCEPPatternOperator -> alert-select -> Sink:
> notification-sink-1
> (1/1).}
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:970)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:51
> 1)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.
> java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor
> .java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator
> KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateExcept
> ion: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"origin":"
> YUL","destination":"YWG","flight":"AC8593","aircraft":"
> CRJ7","registration":"C-G
> OJZ","callsign":"JZA593","speed":370,"altitude":38000,"
> course":287,"time":"2017-
> 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 150971668500                                                       0, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:4                                                       3)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:897)
>         ... 5 more
>         Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed                                                        state future.
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:90)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.cleanup(StreamTask.java:1023)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.run(StreamTask.java:961)
>                 ... 5 more
>         Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalSta
> teException: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"o
> rigin":"YUL","destination":"YWG","flight":"AC8593","
> aircraft":"CRJ7","registrati
> on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude"
> :38000,"course":287,"time
> ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509                                                       716685000, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>                 at java.util.concurrent.FutureTask.report(FutureTask.
> java:122)
>                 at java.util.concurrent.FutureTask.get(FutureTask.
> java:192)
>                 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUti                                                       l.java:43)
>                 at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(S
> tateUtil.java:85)
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:88)
>                 ... 7 more
>         Caused by: java.lang.IllegalStateException: Could not find id for
> entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","
> flight"
> :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","
> callsign":"JZA593","speed":
> 370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat":47.9129
> ,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge(null, 5)
> , SharedBufferEdge(null, 6)], 1)
>                 at org.apache.flink.util.Preconditions.checkState(
> Preconditions.
> java:195)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:971)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:838)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :928)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :852)
>                 at org.apache.flink.runtime.state.heap.
> NestedMapsStateTable$Nest
> edMapsStateTableSnapshot.writeMappingsInKeyGroup(
> NestedMapsStateTable.java:355)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:347)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:329)
>                 at org.apache.flink.runtime.io.
> async.AbstractAsyncIOCallable.cal
> l(AbstractAsyncIOCallable.java:72)
>                 at java.util.concurrent.FutureTask.run(FutureTask.
> java:266)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.sna
> pshot(HeapKeyedStateBackend.java:372)
>                 at org.apache.flink.streaming.api.operators.
> AbstractStreamOperat
> or.snapshotState(AbstractStreamOperator.java:397)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.checkpointStreamOperator(StreamTask.java:1162)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.executeCheckpointing(StreamTask.java:1094)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpoin
> tState(StreamTask.java:654)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCh
> eckpoint(StreamTask.java:590)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCh
> eckpointOnBarrier(StreamTask.java:543)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyChe
> ckpoint(BarrierBuffer.java:378)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBa
> rrier(BarrierBuffer.java:228)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> getNextNo
> nBlocked(BarrierBuffer.java:183)
>                 at org.apache.flink.streaming.runtime.io.
> StreamInputProcessor.pr
> ocessInput(StreamInputProcessor.java:213)
>                 at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.r
> un(OneInputStreamTask.java:69)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(St
> reamTask.java:263)
>                 at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:702)
>                 ... 1 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not
> find id f                                                       or entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"
> YUL","destination":"YWG"
> ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"
> ,"callsign":"JZA593"
> ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat
> ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge
> (null, 5), SharedBufferEdge(null, 6)], 1)]
>
> 11/03/2017 13:46:46     Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 2 fo                                                       r
> operator KeyedCEPPatternOperator -> alert-select -> Sink:
> notification-sink-1
> (1/1).}
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:970)
>         at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:51
> 1)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.
> java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor
> .java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
> operator
> KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
>         ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalStateExcept
> ion: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"origin":"
> YUL","destination":"YWG","flight":"AC8593","aircraft":"
> CRJ7","registration":"C-G
> OJZ","callsign":"JZA593","speed":370,"altitude":38000,"
> course":287,"time":"2017-
> 11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 150971668500                                                       0, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:4                                                       3)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRu
> nnable.run(StreamTask.java:897)
>         ... 5 more
>         Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed                                                        state future.
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:90)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.cleanup(StreamTask.java:1023)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncChec
> kpointRunnable.run(StreamTask.java:961)
>                 ... 5 more
>         Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException:
> Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({"o
> rigin":"YUL","destination":"YWG","flight":"AC8593","
> aircraft":"CRJ7","registrati
> on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude"
> :38000,"course":287,"time
> ":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509                                                       716685000, 0),
> [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
>                 at java.util.concurrent.FutureTask.report(FutureTask.
> java:122)
>                 at java.util.concurrent.FutureTask.get(FutureTask.
> java:192)
>                 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUti                                                       l.java:43)
>                 at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(S
> tateUtil.java:85)
>                 at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResu
> lt.cancel(OperatorSnapshotResult.java:88)
>                 ... 7 more
>         Caused by: java.lang.IllegalStateException: Could not find id for
> entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","
> flight"
> :"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","
> callsign":"JZA593","speed":
> 370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
> 1509716685000, 0), [SharedBufferEdge(null, 5)
> , SharedBufferEdge(null, 6)], 1)
>                 at org.apache.flink.util.Preconditions.checkState(
> Preconditions.
> java:195)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:971)
>                 at org.apache.flink.cep.nfa.SharedBuffer$
> SharedBufferSerializer.
> serialize(SharedBuffer.java:838)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :928)
>                 at org.apache.flink.cep.nfa.NFA$
> NFASerializer.serialize(NFA.java
> :852)
>                 at org.apache.flink.runtime.state.heap.
> NestedMapsStateTable$Nest
> edMapsStateTableSnapshot.writeMappingsInKeyGroup(
> NestedMapsStateTable.java:355)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:347)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend$1.p
> erformOperation(HeapKeyedStateBackend.java:329)
>                 at org.apache.flink.runtime.io.
> async.AbstractAsyncIOCallable.cal
> l(AbstractAsyncIOCallable.java:72)
>                 at java.util.concurrent.FutureTask.run(FutureTask.
> java:266)
>                 at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.sna
> pshot(HeapKeyedStateBackend.java:372)
>                 at org.apache.flink.streaming.api.operators.
> AbstractStreamOperat
> or.snapshotState(AbstractStreamOperator.java:397)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.checkpointStreamOperator(StreamTask.java:1162)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask$
> Checkpoin
> tingOperation.executeCheckpointing(StreamTask.java:1094)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpoin
> tState(StreamTask.java:654)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCh
> eckpoint(StreamTask.java:590)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCh
> eckpointOnBarrier(StreamTask.java:543)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> notifyChe
> ckpoint(BarrierBuffer.java:378)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> processBa
> rrier(BarrierBuffer.java:228)
>                 at org.apache.flink.streaming.runtime.io.BarrierBuffer.
> getNextNo
> nBlocked(BarrierBuffer.java:183)
>                 at org.apache.flink.streaming.runtime.io.
> StreamInputProcessor.pr
> ocessInput(StreamInputProcessor.java:213)
>                 at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.r
> un(OneInputStreamTask.java:69)
>                 at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(St
> reamTask.java:263)
>                 at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:702)
>                 ... 1 more
>         [CIRCULAR REFERENCE:java.lang.IllegalStateException: Could not
> find id f                                                       or entry:
> SharedBufferEntry(ValueTimeWrapper({"origin":"
> YUL","destination":"YWG"
> ,"flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ"
> ,"callsign":"JZA593"
> ,"speed":370,"altitude":38000,"course":287,"time":"2017-11-03
> 13:44:45.000","lat
> ":47.9129,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
> [SharedBufferEdge
> (null, 5), SharedBufferEdge(null, 6)], 1)]
> 11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELING
> 11/03/2017 13:46:46     Process(1/1) switched to CANCELING
> 11/03/2017 13:46:46     Source: json-parser(1/1) switched to CANCELED
> 11/03/2017 13:46:46     Process(1/1) switched to CANCELED
> 11/03/2017 13:46:46     Job execution switched to status RESTARTING.
> 11/03/2017 13:46:56     Job execution switched to status CREATED.
> 11/03/2017 13:46:56     Job execution switched to status RUNNING.
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     Process(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to SCHEDULED
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     Process(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to DEPLOYING
> 11/03/2017 13:46:56     Process(1/1) switched to RUNNING
> 11/03/2017 13:46:56     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to RUNNING
> 11/03/2017 13:46:56     Source: json-parser(1/1) switched to RUNNING
> 11/03/2017 13:46:57     KeyedCEPPatternOperator -> alert-select -> Sink:
> notific
> ation-sink-1(1/1) switched to FAILED
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:321)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initi
> alizeState(AbstractStreamOperator.java:217)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperato
> rs(StreamTask.java:676)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(S
> treamTask.java:663)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask
> .java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at java.io.ObjectInputStream$BlockDataInputStream.
> readBlockHeader(Object
> InputStream.java:2519)
>         at java.io.ObjectInputStream$BlockDataInputStream.refill(
> ObjectInputStre
> am.java:2553)
>         at java.io.ObjectInputStream$BlockDataInputStream.
> skipBlockData(ObjectIn
> putStream.java:2455)
>         at java.io.ObjectInputStream.skipCustomData(
> ObjectInputStream.java:1951)
>         at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:162
> 1)
>         at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1518)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:20
> 00)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1924)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeCondition(NFA.j
> ava:1211)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeStates(NFA.java
> :1169)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:957)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:852)
>         at org.apache.flink.runtime.state.heap.
> StateTableByKeyGroupReaders$State
> TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(
> StateTableByKeyGroupReaders.jav
> a:132)
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePart
> itionedState(HeapKeyedStateBackend.java:518)
>         at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.restore(Hea
> pKeyedStateBackend.java:397)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateB
> ackend(StreamTask.java:772)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:311)
>         ... 6 more
>
> 11/03/2017 13:46:57     Job execution switched to status FAILING.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:321)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initi
> alizeState(AbstractStreamOperator.java:217)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperato
> rs(StreamTask.java:676)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(S
> treamTask.java:663)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask
> .java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at java.io.ObjectInputStream$BlockDataInputStream.
> readBlockHeader(Object
> InputStream.java:2519)
>         at java.io.ObjectInputStream$BlockDataInputStream.refill(
> ObjectInputStre
> am.java:2553)
>         at java.io.ObjectInputStream$BlockDataInputStream.
> skipBlockData(ObjectIn
> putStream.java:2455)
>         at java.io.ObjectInputStream.skipCustomData(
> ObjectInputStream.java:1951)
>         at java.io.ObjectInputStream.readNonProxyDesc(
> ObjectInputStream.java:162
> 1)
>         at java.io.ObjectInputStream.readClassDesc(
> ObjectInputStream.java:1518)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 774)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:20
> 00)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1924)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1
> 801)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeCondition(NFA.j
> ava:1211)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.
> deserializeStates(NFA.java
> :1169)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:957)
>         at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.
> java:852)
>         at org.apache.flink.runtime.state.heap.
> StateTableByKeyGroupReaders$State
> TableByKeyGroupReaderV2V3.readMappingsInKeyGroup(
> StateTableByKeyGroupReaders.jav
> a:132)
>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePart
> itionedState(HeapKeyedStateBackend.java:518)
>         at org.apache.flink.runtime.state.heap.
> HeapKeyedStateBackend.restore(Hea
> pKeyedStateBackend.java:397)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateB
> ackend(StreamTask.java:772)
>         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.initK
> eyedState(AbstractStreamOperator.java:311)
>         ... 6 more
>
>
> What is happening here? Am I doing something wrong? Is there some sort of
> conflict between within clauses deadlines and checkpoint deadlines?
>
> I found the following similar JIRA pages, but none of those mention
> circular references: https://issues.apache.org/jira/browse/FLINK-6321
> https://issues.apache.org/jira/browse/FLINK-7484
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Kind Regards,
> Federico D'Ambrosio
>



-- 
Federico D'Ambrosio

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Bump.

On Thu, Oct 25, 2018 at 9:11 AM Shailesh Jain <sh...@stellapps.com>
wrote:

> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
> shailesh.jain@stellapps.com> wrote:
>
>> Hi Dawid,
>>
>> Thanks for your time on this. The diff should have pointed out only the
>> top 3 commits, but since it did not, it is possible I did not rebase my
>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>> I hit the same issue again.
>>
>> Thanks again,
>> Shailesh
>>
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> I am afraid it is gonna be hard to help you, as this branch differs
>>> significantly from 1.4.2 release (I've done diff across your branch and
>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>> does not correspond to the lines in the exception you've posted previously.
>>> Could you check if the problem occurs on vanilla flink as well?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>>> couple of changes in the CEP operator specifically (top 3 commits here:
>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>> I've made to CEP operators do not touch the checkpointing path, just
>>> overloading the operator for a specific way of handling event time.
>>>
>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>> move to 1.6.0 immediately, but eventually yes.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>> or have you introduced some changes? I am asking cause the lines in
>>>> stacktrace does not align with the source code for 1.4.2.
>>>>
>>>> Also it is a different exception than the one in the issue you've
>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>> the SharedBuffer implementation in 1.6.0.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>
>>>> Hi,
>>>>
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>>> using HDFS (2.8.4) as state backend.
>>>>
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>> to fail task externally SelectCepOperator (1/1)
>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>> RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>> operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>> keyed state future.
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at
>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at
>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>
>>>> Any ideas on why I'm hitting this especially when this (
>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>> fixed in 1.4.2 ?
>>>>
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>>> Thank you very much for your steady response, Kostas!
>>>>>
>>>>> Cheers,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Federico,
>>>>>>
>>>>>> Thanks for trying it out!
>>>>>> Great to hear that your problem was fixed!
>>>>>>
>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>> would expect 1 or 2 more weeks testing.
>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>> potential issues we may find during testing.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>> linked.
>>>>>>
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Perfect! thanks a lot!
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>> to you.
>>>>>>>
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>
>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>>> case:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>
>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>>  Could not find id for
>>>>>>>> entry:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>
>>>>
>>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Thank you, Stefan. Any ideas on when can we expect 1.6.3 release?

On Thu, Nov 8, 2018 at 4:28 PM Stefan Richter <s....@data-artisans.com>
wrote:

> Sure, it is already merged as FLINK-10816.
>
> Best,
> Stefan
>
> On 8. Nov 2018, at 11:53, Shailesh Jain <sh...@stellapps.com>
> wrote:
>
> Thanks a lot for looking into this issue Stefan.
>
> Could you please let me know the issue ID once you open it? It'll help me
> understand the problem better, and also I could do a quick test in our
> environment once the issue is resolved.
>
> Thanks,
> Shailesh
>
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org wrote:
>
>> Really good finding Stefan!
>>
>> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I think I can already spot the
>>> problem: LockableTypeSerializer.duplicate() is not properly implemented
>>> because it also has to call duplicate() on the element serialiser that is
>>> passed into the constructor of the new instance. I will open an issue and
>>> fix the problem.
>>>
>>> Best,
>>> Stefan
>>>
>>> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>>>
>>> Hi Shailesh,
>>>
>>> could you maybe provide us with an example program which is able to
>>> reproduce this problem? This would help the community to better debug the
>>> problem. It looks not right and might point towards a bug in Flink. Thanks
>>> a lot!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> This is some problem with serializing your events using Kryo. I'm
>>>> adding Gordon to cc, as he was recently working with serializers. He might
>>>> give you more insights what is going wrong.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>>>
>>>> Hi Dawid,
>>>>
>>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>>>> 1.6.1, the only commit on top of 1.6 is this:
>>>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>>>
>>>> I ran two separate identical jobs (with and without checkpointing
>>>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>>>> when checkpointing (HDFS backend) is enabled*, with the below stack
>>>> trace.
>>>>
>>>> I did see a similar problem with different operators here (
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>>>> Is this a known issue which is getting addressed?
>>>>
>>>> Any ideas on what could be causing this?
>>>>
>>>> Thanks,
>>>> Shailesh
>>>>
>>>>
>>>> 2018-10-24 17:04:13,365 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>>>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>>>> function.
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>>         at
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>>         at
>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>>         at
>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>         at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>>         at
>>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>>         at
>>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>>         ... 10 more
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>>         at
>>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>>         at
>>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>>         at
>>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>>         ... 18 more
>>>>
>>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>>>> shailesh.jain@stellapps.com> wrote:
>>>>
>>>>> Hi Dawid,
>>>>>
>>>>> Thanks for your time on this. The diff should have pointed out only
>>>>> the top 3 commits, but since it did not, it is possible I did not rebase my
>>>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>>>> I hit the same issue again.
>>>>>
>>>>> Thanks again,
>>>>> Shailesh
>>>>>
>>>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <
>>>>> dwysakowicz@apache.org> wrote:
>>>>>
>>>>>> Hi Shailesh,
>>>>>>
>>>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>>>> does not correspond to the lines in the exception you've posted previously.
>>>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>>>
>>>>>> Hi Dawid,
>>>>>>
>>>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have
>>>>>> added a couple of changes in the CEP operator specifically (top 3 commits
>>>>>> here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
>>>>>> Changes I've made to CEP operators do not touch the checkpointing path,
>>>>>> just overloading the operator for a specific way of handling event time.
>>>>>>
>>>>>> We are hitting this in production, so I'm not sure it'll be feasible
>>>>>> to move to 1.6.0 immediately, but eventually yes.
>>>>>>
>>>>>> Thanks,
>>>>>> Shailesh
>>>>>>
>>>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>>>> dwysakowicz@apache.org> wrote:
>>>>>>
>>>>>>> Hi Shailesh,
>>>>>>>
>>>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla
>>>>>>> flink, or have you introduced some changes? I am asking cause the lines in
>>>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>>>
>>>>>>> Also it is a different exception than the one in the issue you've
>>>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Dawid
>>>>>>>
>>>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>>>
>>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>>>> to fail task externally SelectCepOperator (1/1)
>>>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>>>> RUNNING to FAILED.
>>>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>>>     at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>>>     at
>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>     at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>     at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6
>>>>>>> for operator SelectCepOperator (1/1).
>>>>>>>     ... 6 more
>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.NullPointerException
>>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>     at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>>     at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>>     ... 5 more
>>>>>>>     Suppressed: java.lang.Exception: Could not properly cancel
>>>>>>> managed keyed state future.
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>>>         ... 5 more
>>>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>>>> java.lang.NullPointerException
>>>>>>>         at
>>>>>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>         at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>>>         ... 7 more
>>>>>>>     Caused by: java.lang.NullPointerException
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>>>         at
>>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>         at
>>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>>         ... 5 more
>>>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>>>
>>>>>>> Any ideas on why I'm hitting this especially when this (
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>>>> fixed in 1.4.2 ?
>>>>>>>
>>>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Federico
>>>>>>>>
>>>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi Federico,
>>>>>>>>>
>>>>>>>>> Thanks for trying it out!
>>>>>>>>> Great to hear that your problem was fixed!
>>>>>>>>>
>>>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>>>> potential issues we may find during testing.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>> Hi Kostas,
>>>>>>>>>
>>>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>>>> linked.
>>>>>>>>>
>>>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>>>
>>>>>>>>> Thank you very much,
>>>>>>>>> Federico
>>>>>>>>>
>>>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Perfect! thanks a lot!
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Kostas,
>>>>>>>>>>
>>>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get
>>>>>>>>>> back to you.
>>>>>>>>>>
>>>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>>
>>>>>>>>>>> Hi Federico,
>>>>>>>>>>>
>>>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>>>
>>>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>>>> your case:
>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>>>
>>>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>>
>>>>>>>>>>>  Could not find id for
>>>>>>>>>>> entry:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Stefan Richter <s....@data-artisans.com>.
Sure, it is already merged as FLINK-10816.

Best,
Stefan

> On 8. Nov 2018, at 11:53, Shailesh Jain <sh...@stellapps.com> wrote:
> 
> Thanks a lot for looking into this issue Stefan.
> 
> Could you please let me know the issue ID once you open it? It'll help me understand the problem better, and also I could do a quick test in our environment once the issue is resolved.
> 
> Thanks,
> Shailesh
> 
> On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org <ma...@apache.org> wrote:
> Really good finding Stefan!
> 
> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.
> 
> Best,
> Stefan
> 
>> On 7. Nov 2018, at 17:17, Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Shailesh,
>> 
>> could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>> This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>> Hi Dawid,
>>> 
>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c <https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c>
>>> 
>>> I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.
>>> 
>>> I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html>). Is this a known issue which is getting addressed?
>>> 
>>> Any ideas on what could be causing this?
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> 
>>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>         at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>         ... 10 more
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>         at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>         at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>         at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>         ... 18 more
>>> 
>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>>> Hi Dawid,
>>> 
>>> Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.
>>> 
>>> Thanks again,
>>> Shailesh
>>> 
>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>> Hi Shailesh,
>>> 
>>> I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> 
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>> Hi Dawid,
>>>> 
>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2 <https://github.com/jainshailesh/flink/commits/poc_on_1.4.2>). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.
>>>> 
>>>> We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.
>>>> 
>>>> Thanks,
>>>> Shailesh
>>>> 
>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>>> Hi Shailesh,
>>>> 
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.
>>>> 
>>>> Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.
>>>> 
>>>> Best,
>>>> 
>>>> Dawid
>>>> 
>>>> 
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>> Hi,
>>>>> 
>>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>>> 
>>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
>>>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
>>>>>     ... 6 more
>>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>     ... 5 more
>>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>         ... 5 more
>>>>>     Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>         at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>         ... 7 more
>>>>>     Caused by: java.lang.NullPointerException
>>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>         at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>         ... 5 more
>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>> 
>>>>> Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>) says it has been fixed in 1.4.2 ?
>>>>> 
>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>> Thank you very much for your steady response, Kostas!
>>>>> 
>>>>> Cheers,
>>>>> Federico
>>>>> 
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Hi Federico,
>>>>> 
>>>>> Thanks for trying it out! 
>>>>> Great to hear that your problem was fixed!
>>>>> 
>>>>> The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
>>>>> So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.
>>>>> 
>>>>> Cheers,
>>>>> Kostas
>>>>> 
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>> 
>>>>>> Hi Kostas,
>>>>>> 
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.
>>>>>> 
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>> 
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>> 
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Perfect! thanks a lot!
>>>>>> 
>>>>>> Kostas
>>>>>> 
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>> 
>>>>>>> Hi Kostas, 
>>>>>>> 
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>>>>>>> 
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>>> Hi Federico,
>>>>>>> 
>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>> 
>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
>>>>>>> 
>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Kostas
>>>>>>> 
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>>> 
>>>>>>>>  Could not find id for entry:                                                        
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -- 
>>>>>>> Federico D'Ambrosio
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Federico D'Ambrosio
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Federico D'Ambrosio
>>>> 
>>> 
> 


Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Thanks a lot for looking into this issue Stefan.

Could you please let me know the issue ID once you open it? It'll help me
understand the problem better, and also I could do a quick test in our
environment once the issue is resolved.

Thanks,
Shailesh

On Wed, Nov 7, 2018, 10:46 PM Till Rohrmann <trohrmann@apache.org wrote:

> Really good finding Stefan!
>
> On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s....@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> I think I can already spot the
>> problem: LockableTypeSerializer.duplicate() is not properly implemented
>> because it also has to call duplicate() on the element serialiser that is
>> passed into the constructor of the new instance. I will open an issue and
>> fix the problem.
>>
>> Best,
>> Stefan
>>
>> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>>
>> Hi Shailesh,
>>
>> could you maybe provide us with an example program which is able to
>> reproduce this problem? This would help the community to better debug the
>> problem. It looks not right and might point towards a bug in Flink. Thanks
>> a lot!
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> This is some problem with serializing your events using Kryo. I'm adding
>>> Gordon to cc, as he was recently working with serializers. He might give
>>> you more insights what is going wrong.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>>> 1.6.1, the only commit on top of 1.6 is this:
>>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>>
>>> I ran two separate identical jobs (with and without checkpointing
>>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>>> when checkpointing (HDFS backend) is enabled*, with the below stack
>>> trace.
>>>
>>> I did see a similar problem with different operators here (
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>>> Is this a known issue which is getting addressed?
>>>
>>> Any ideas on what could be causing this?
>>>
>>> Thanks,
>>> Shailesh
>>>
>>>
>>> 2018-10-24 17:04:13,365 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     -
>>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>>> function.
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>>         at
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>>         at
>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>>         at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>         at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>>         at
>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>>         at
>>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>>         at
>>> org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>>         ... 10 more
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>>         at
>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>>         at
>>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>>         at
>>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>         at
>>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>>         ... 18 more
>>>
>>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>>> shailesh.jain@stellapps.com> wrote:
>>>
>>>> Hi Dawid,
>>>>
>>>> Thanks for your time on this. The diff should have pointed out only the
>>>> top 3 commits, but since it did not, it is possible I did not rebase my
>>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>>> I hit the same issue again.
>>>>
>>>> Thanks again,
>>>> Shailesh
>>>>
>>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <
>>>> dwysakowicz@apache.org> wrote:
>>>>
>>>>> Hi Shailesh,
>>>>>
>>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>>> does not correspond to the lines in the exception you've posted previously.
>>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>>
>>>>> Hi Dawid,
>>>>>
>>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added
>>>>> a couple of changes in the CEP operator specifically (top 3 commits here:
>>>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>>>> I've made to CEP operators do not touch the checkpointing path, just
>>>>> overloading the operator for a specific way of handling event time.
>>>>>
>>>>> We are hitting this in production, so I'm not sure it'll be feasible
>>>>> to move to 1.6.0 immediately, but eventually yes.
>>>>>
>>>>> Thanks,
>>>>> Shailesh
>>>>>
>>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>>> dwysakowicz@apache.org> wrote:
>>>>>
>>>>>> Hi Shailesh,
>>>>>>
>>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>>>> or have you introduced some changes? I am asking cause the lines in
>>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>>
>>>>>> Also it is a different exception than the one in the issue you've
>>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>>
>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>>> to fail task externally SelectCepOperator (1/1)
>>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>>> 2018-09-26 17:07:39,370 INFO
>>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>>> RUNNING to FAILED.
>>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>>     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6
>>>>>> for operator SelectCepOperator (1/1).
>>>>>>     ... 6 more
>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.NullPointerException
>>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>     at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>     at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>     ... 5 more
>>>>>>     Suppressed: java.lang.Exception: Could not properly cancel
>>>>>> managed keyed state future.
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>>         ... 5 more
>>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.NullPointerException
>>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>         at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>>         ... 7 more
>>>>>>     Caused by: java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>>         at
>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>>         at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>>         at
>>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at
>>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>>         at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>>         ... 5 more
>>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>>
>>>>>> Any ideas on why I'm hitting this especially when this (
>>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>>> fixed in 1.4.2 ?
>>>>>>
>>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Federico
>>>>>>>
>>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> Thanks for trying it out!
>>>>>>>> Great to hear that your problem was fixed!
>>>>>>>>
>>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>>> potential issues we may find during testing.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>> Hi Kostas,
>>>>>>>>
>>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>>> linked.
>>>>>>>>
>>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>>
>>>>>>>> Thank you very much,
>>>>>>>> Federico
>>>>>>>>
>>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Perfect! thanks a lot!
>>>>>>>>>
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>> Hi Kostas,
>>>>>>>>>
>>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get
>>>>>>>>> back to you.
>>>>>>>>>
>>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>>
>>>>>>>>>> Hi Federico,
>>>>>>>>>>
>>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>>
>>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>>> your case:
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>>
>>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>>
>>>>>>>>>>  Could not find id for
>>>>>>>>>> entry:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Federico D'Ambrosio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Till Rohrmann <tr...@apache.org>.
Really good finding Stefan!

On Wed, Nov 7, 2018 at 5:28 PM Stefan Richter <s....@data-artisans.com>
wrote:

> Hi,
>
> I think I can already spot the problem: LockableTypeSerializer.duplicate()
> is not properly implemented because it also has to call duplicate() on the
> element serialiser that is passed into the constructor of the new instance.
> I will open an issue and fix the problem.
>
> Best,
> Stefan
>
> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
>
> Hi Shailesh,
>
> could you maybe provide us with an example program which is able to
> reproduce this problem? This would help the community to better debug the
> problem. It looks not right and might point towards a bug in Flink. Thanks
> a lot!
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> This is some problem with serializing your events using Kryo. I'm adding
>> Gordon to cc, as he was recently working with serializers. He might give
>> you more insights what is going wrong.
>>
>> Best,
>>
>> Dawid
>> On 25/10/2018 05:41, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag
>> 1.6.1, the only commit on top of 1.6 is this:
>> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>>
>> I ran two separate identical jobs (with and without checkpointing
>> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
>> when checkpointing (HDFS backend) is enabled*, with the below stack
>> trace.
>>
>> I did see a similar problem with different operators here (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
>> Is this a known issue which is getting addressed?
>>
>> Any ideas on what could be causing this?
>>
>> Thanks,
>> Shailesh
>>
>>
>> 2018-10-24 17:04:13,365 INFO
>> org.apache.flink.runtime.taskmanager.Task                     -
>> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
>> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
>> function.
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>         at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>         at
>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>         at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException:
>> java.lang.ArrayIndexOutOfBoundsException: -1
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>         at
>> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>         at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>         at
>> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>         ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>         at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>         at
>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>         at
>> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>         at
>> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>         at
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>         at
>> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>         ... 18 more
>>
>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
>> shailesh.jain@stellapps.com> wrote:
>>
>>> Hi Dawid,
>>>
>>> Thanks for your time on this. The diff should have pointed out only the
>>> top 3 commits, but since it did not, it is possible I did not rebase my
>>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>>> I hit the same issue again.
>>>
>>> Thanks again,
>>> Shailesh
>>>
>>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> I am afraid it is gonna be hard to help you, as this branch differs
>>>> significantly from 1.4.2 release (I've done diff across your branch and
>>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>>> does not correspond to the lines in the exception you've posted previously.
>>>> Could you check if the problem occurs on vanilla flink as well?
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>>
>>>> Hi Dawid,
>>>>
>>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added
>>>> a couple of changes in the CEP operator specifically (top 3 commits here:
>>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>>> I've made to CEP operators do not touch the checkpointing path, just
>>>> overloading the operator for a specific way of handling event time.
>>>>
>>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>>> move to 1.6.0 immediately, but eventually yes.
>>>>
>>>> Thanks,
>>>> Shailesh
>>>>
>>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <
>>>> dwysakowicz@apache.org> wrote:
>>>>
>>>>> Hi Shailesh,
>>>>>
>>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>>> or have you introduced some changes? I am asking cause the lines in
>>>>> stacktrace does not align with the source code for 1.4.2.
>>>>>
>>>>> Also it is a different exception than the one in the issue you've
>>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>>> the SharedBuffer implementation in 1.6.0.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I think I've hit this same issue on a 3 node standalone cluster
>>>>> (1.4.2) using HDFS (2.8.4) as state backend.
>>>>>
>>>>> 2018-09-26 17:07:39,370 INFO
>>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>>> to fail task externally SelectCepOperator (1/1)
>>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>>> 2018-09-26 17:07:39,370 INFO
>>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>>> RUNNING to FAILED.
>>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>>> operator SelectCepOperator (1/1).
>>>>>     ... 6 more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.NullPointerException
>>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>     at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>     ... 5 more
>>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>>> keyed state future.
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>>         ... 5 more
>>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>>> java.lang.NullPointerException
>>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>         at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>>         at
>>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>>         at
>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>>         ... 7 more
>>>>>     Caused by: java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>>         at
>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>>         at
>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>>         at
>>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>         at
>>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>>         at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>>         ... 5 more
>>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>>
>>>>> Any ideas on why I'm hitting this especially when this (
>>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>>> fixed in 1.4.2 ?
>>>>>
>>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>>> Thank you very much for your steady response, Kostas!
>>>>>>
>>>>>> Cheers,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Hi Federico,
>>>>>>>
>>>>>>> Thanks for trying it out!
>>>>>>> Great to hear that your problem was fixed!
>>>>>>>
>>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>>> would expect 1 or 2 more weeks testing.
>>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>>> potential issues we may find during testing.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>>> linked.
>>>>>>>
>>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>>
>>>>>>> Thank you very much,
>>>>>>> Federico
>>>>>>>
>>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Perfect! thanks a lot!
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>> Hi Kostas,
>>>>>>>>
>>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>>> to you.
>>>>>>>>
>>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>>
>>>>>>>>> Hi Federico,
>>>>>>>>>
>>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>>
>>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to
>>>>>>>>> your case:
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>>
>>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>>
>>>>>>>>>  Could not find id for
>>>>>>>>> entry:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Federico D'Ambrosio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>
>>>>>
>>>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.

Best,
Stefan

> On 7. Nov 2018, at 17:17, Till Rohrmann <tr...@apache.org> wrote:
> 
> Hi Shailesh,
> 
> could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!
> 
> Cheers,
> Till
> 
> On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
> This is some problem with serializing your events using Kryo. I'm adding Gordon to cc, as he was recently working with serializers. He might give you more insights what is going wrong.
> 
> Best,
> 
> Dawid
> 
> On 25/10/2018 05:41, Shailesh Jain wrote:
>> Hi Dawid,
>> 
>> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1, the only commit on top of 1.6 is this: https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c <https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c>
>> 
>> I ran two separate identical jobs (with and without checkpointing enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) only when checkpointing (HDFS backend) is enabled, with the below stack trace.
>> 
>> I did see a similar problem with different operators here (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html>). Is this a known issue which is getting addressed?
>> 
>> Any ideas on what could be causing this?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> 2018-10-24 17:04:13,365 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1) (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
>> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>>         at org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.WrappingRuntimeException: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>>         at org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>>         at com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>>         ... 10 more
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>         at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>>         at org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>>         at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>>         at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>>         at org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>         at org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>>         ... 18 more
>> 
>> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>> Hi Dawid,
>> 
>> Thanks for your time on this. The diff should have pointed out only the top 3 commits, but since it did not, it is possible I did not rebase my branch against 1.4.2 correctly. I'll check this out and get back to you if I hit the same issue again.
>> 
>> Thanks again,
>> Shailesh
>> 
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>> Hi Shailesh,
>> 
>> I am afraid it is gonna be hard to help you, as this branch differs significantly from 1.4.2 release (I've done diff across your branch and tag/release-1.4.2). Moreover the code in the branch you've provided still does not correspond to the lines in the exception you've posted previously. Could you check if the problem occurs on vanilla flink as well?
>> 
>> Best,
>> 
>> Dawid
>> 
>> 
>> On 27/09/18 08:22, Shailesh Jain wrote:
>>> Hi Dawid,
>>> 
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a couple of changes in the CEP operator specifically (top 3 commits here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2 <https://github.com/jainshailesh/flink/commits/poc_on_1.4.2>). Changes I've made to CEP operators do not touch the checkpointing path, just overloading the operator for a specific way of handling event time.
>>> 
>>> We are hitting this in production, so I'm not sure it'll be feasible to move to 1.6.0 immediately, but eventually yes.
>>> 
>>> Thanks,
>>> Shailesh
>>> 
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>> Hi Shailesh,
>>> 
>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or have you introduced some changes? I am asking cause the lines in stacktrace does not align with the source code for 1.4.2.
>>> 
>>> Also it is a different exception than the one in the issue you've linked, so if it is a problem than it is definitely a different one. Last thing I would recommend upgrading to the newest version, as we rewritten the SharedBuffer implementation in 1.6.0.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> 
>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>> Hi,
>>>> 
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>> 
>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO  org.apache.flink.runtime.taskmanager.Task                     - SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>> 
>>>> Any ideas on why I'm hitting this especially when this (https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>) says it has been fixed in 1.4.2 ?
>>>> 
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>> Thank you very much for your steady response, Kostas!
>>>> 
>>>> Cheers,
>>>> Federico
>>>> 
>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>> Hi Federico,
>>>> 
>>>> Thanks for trying it out! 
>>>> Great to hear that your problem was fixed!
>>>> 
>>>> The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
>>>> So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.
>>>> 
>>>> Cheers,
>>>> Kostas
>>>> 
>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>> 
>>>>> Hi Kostas,
>>>>> 
>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.
>>>>> 
>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>> 
>>>>> Thank you very much,
>>>>> Federico
>>>>> 
>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>> Perfect! thanks a lot!
>>>>> 
>>>>> Kostas
>>>>> 
>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>> 
>>>>>> Hi Kostas, 
>>>>>> 
>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>>>>>> 
>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>>>>>> Hi Federico,
>>>>>> 
>>>>>> I assume that you are using Flink 1.3, right?
>>>>>> 
>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>>>>> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
>>>>>> 
>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>> 
>>>>>> Thanks,
>>>>>> Kostas
>>>>>> 
>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>>>>>> 
>>>>>>>  Could not find id for entry:                                                        
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -- 
>>>>>> Federico D'Ambrosio
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Federico D'Ambrosio
>>>> 
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> Federico D'Ambrosio
>>> 
>> 


Re: FlinkCEP, circular references and checkpointing failures

Posted by Till Rohrmann <tr...@apache.org>.
Hi Shailesh,

could you maybe provide us with an example program which is able to
reproduce this problem? This would help the community to better debug the
problem. It looks not right and might point towards a bug in Flink. Thanks
a lot!

Cheers,
Till

On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> This is some problem with serializing your events using Kryo. I'm adding
> Gordon to cc, as he was recently working with serializers. He might give
> you more insights what is going wrong.
>
> Best,
>
> Dawid
> On 25/10/2018 05:41, Shailesh Jain wrote:
>
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
> the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only
> when checkpointing (HDFS backend) is enabled*, with the below stack trace.
>
> I did see a similar problem with different operators here (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
> function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <
> shailesh.jain@stellapps.com> wrote:
>
>> Hi Dawid,
>>
>> Thanks for your time on this. The diff should have pointed out only the
>> top 3 commits, but since it did not, it is possible I did not rebase my
>> branch against 1.4.2 correctly. I'll check this out and get back to you if
>> I hit the same issue again.
>>
>> Thanks again,
>> Shailesh
>>
>> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> I am afraid it is gonna be hard to help you, as this branch differs
>>> significantly from 1.4.2 release (I've done diff across your branch and
>>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>>> does not correspond to the lines in the exception you've posted previously.
>>> Could you check if the problem occurs on vanilla flink as well?
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 27/09/18 08:22, Shailesh Jain wrote:
>>>
>>> Hi Dawid,
>>>
>>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>>> couple of changes in the CEP operator specifically (top 3 commits here:
>>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>>> I've made to CEP operators do not touch the checkpointing path, just
>>> overloading the operator for a specific way of handling event time.
>>>
>>> We are hitting this in production, so I'm not sure it'll be feasible to
>>> move to 1.6.0 immediately, but eventually yes.
>>>
>>> Thanks,
>>> Shailesh
>>>
>>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>>> wrote:
>>>
>>>> Hi Shailesh,
>>>>
>>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink,
>>>> or have you introduced some changes? I am asking cause the lines in
>>>> stacktrace does not align with the source code for 1.4.2.
>>>>
>>>> Also it is a different exception than the one in the issue you've
>>>> linked, so if it is a problem than it is definitely a different one. Last
>>>> thing I would recommend upgrading to the newest version, as we rewritten
>>>> the SharedBuffer implementation in 1.6.0.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>>
>>>> Hi,
>>>>
>>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>>> using HDFS (2.8.4) as state backend.
>>>>
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>>> to fail task externally SelectCepOperator (1/1)
>>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>> 2018-09-26 17:07:39,370 INFO
>>>> org.apache.flink.runtime.taskmanager.Task                     -
>>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>>> RUNNING to FAILED.
>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>>> operator SelectCepOperator (1/1).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>     at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>>> keyed state future.
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>>         ... 5 more
>>>>     Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.NullPointerException
>>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>>         at
>>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>>         at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>>         ... 7 more
>>>>     Caused by: java.lang.NullPointerException
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>>         at
>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>>         at
>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>>         at
>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>>         at
>>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>         at
>>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>>         at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>>         ... 5 more
>>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>>
>>>> Any ideas on why I'm hitting this especially when this (
>>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>>> fixed in 1.4.2 ?
>>>>
>>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>>> Thank you very much for your steady response, Kostas!
>>>>>
>>>>> Cheers,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Federico,
>>>>>>
>>>>>> Thanks for trying it out!
>>>>>> Great to hear that your problem was fixed!
>>>>>>
>>>>>> The feature freeze for the release is going to be next week, and I
>>>>>> would expect 1 or 2 more weeks testing.
>>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>>> potential issues we may find during testing.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes
>>>>>> and it didn't crash, so that was the same underlying issue of the JIRA you
>>>>>> linked.
>>>>>>
>>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>>
>>>>>> Thank you very much,
>>>>>> Federico
>>>>>>
>>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Perfect! thanks a lot!
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>> Hi Kostas,
>>>>>>>
>>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>>> to you.
>>>>>>>
>>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>>> k.kloudas@data-artisans.com>:
>>>>>>>
>>>>>>>> Hi Federico,
>>>>>>>>
>>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>>
>>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>>> case:
>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>>
>>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>>
>>>>>>>>  Could not find id for
>>>>>>>> entry:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Federico D'Ambrosio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>
>>>>
>>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Dawid Wysakowicz <dw...@apache.org>.
This is some problem with serializing your events using Kryo. I'm adding
Gordon to cc, as he was recently working with serializers. He might give
you more insights what is going wrong.

Best,

Dawid

On 25/10/2018 05:41, Shailesh Jain wrote:
> Hi Dawid,
>
> I've upgraded to flink 1.6.1 and rebased by changes against the tag
> 1.6.1, the only commit on top of 1.6 is this:
> https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c
>
> I ran two separate identical jobs (with and without checkpointing
> enabled), I'm hitting a ArrayIndexOutOfBoundsException (and sometimes
> NPE) *only when checkpointing (HDFS backend) is enabled*, with the
> below stack trace.
>
> I did see a similar problem with different operators here
> (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
> Is this a known issue which is getting addressed?
>
> Any ideas on what could be causing this?
>
> Thanks,
> Shailesh
>
>
> 2018-10-24 17:04:13,365 INFO 
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
> (3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
> org.apache.flink.util.FlinkRuntimeException: Failure happened in
> filter function.
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>         at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>         at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>         at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>         at
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.WrappingRuntimeException:
> java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
>         at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
>         at
> org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
>         at
> com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
>         at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>         at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>         ... 10 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>         at
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
>         at
> org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
>         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>         at
> org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
>         ... 18 more
>
> On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain
> <shailesh.jain@stellapps.com <ma...@stellapps.com>> wrote:
>
>     Hi Dawid,
>
>     Thanks for your time on this. The diff should have pointed out
>     only the top 3 commits, but since it did not, it is possible I did
>     not rebase my branch against 1.4.2 correctly. I'll check this out
>     and get back to you if I hit the same issue again.
>
>     Thanks again,
>     Shailesh
>
>     On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz
>     <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>         Hi Shailesh,
>
>         I am afraid it is gonna be hard to help you, as this branch
>         differs significantly from 1.4.2 release (I've done diff
>         across your branch and tag/release-1.4.2). Moreover the code
>         in the branch you've provided still does not correspond to the
>         lines in the exception you've posted previously. Could you
>         check if the problem occurs on vanilla flink as well?
>
>         Best,
>
>         Dawid
>
>
>         On 27/09/18 08:22, Shailesh Jain wrote:
>>         Hi Dawid,
>>
>>         Yes, it is version 1.4.2. We are running vanilla flink, but
>>         have added a couple of changes in the CEP operator
>>         specifically (top 3 commits here:
>>         https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
>>         Changes I've made to CEP operators do not touch the
>>         checkpointing path, just overloading the operator for a
>>         specific way of handling event time.
>>
>>         We are hitting this in production, so I'm not sure it'll be
>>         feasible to move to 1.6.0 immediately, but eventually yes.
>>
>>         Thanks,
>>         Shailesh
>>
>>         On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz
>>         <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>>
>>             Hi Shailesh,
>>
>>             Are you sure you are using version 1.4.2? Do you run a
>>             vanilla flink, or have you introduced some changes? I am
>>             asking cause the lines in stacktrace does not align with
>>             the source code for 1.4.2.
>>
>>             Also it is a different exception than the one in the
>>             issue you've linked, so if it is a problem than it is
>>             definitely a different one. Last thing I would recommend
>>             upgrading to the newest version, as we rewritten the
>>             SharedBuffer implementation in 1.6.0.
>>
>>             Best,
>>
>>             Dawid
>>
>>
>>             On 26/09/18 13:50, Shailesh Jain wrote:
>>>             Hi,
>>>
>>>             I think I've hit this same issue on a 3 node standalone
>>>             cluster (1.4.2) using HDFS (2.8.4) as state backend.
>>>
>>>             2018-09-26 17:07:39,370 INFO 
>>>             org.apache.flink.runtime.taskmanager.Task                    
>>>             - Attempting to fail task externally SelectCepOperator
>>>             (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>>             2018-09-26 17:07:39,370 INFO 
>>>             org.apache.flink.runtime.taskmanager.Task                    
>>>             - SelectCepOperator (1/1)
>>>             (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from RUNNING
>>>             to FAILED.
>>>             AsynchronousException{java.lang.Exception: Could not
>>>             materialize checkpoint 6 for operator SelectCepOperator
>>>             (1/1).}
>>>                 at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>                 at
>>>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>                 at
>>>             java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>                 at java.lang.Thread.run(Thread.java:748)
>>>             Caused by: java.lang.Exception: Could not materialize
>>>             checkpoint 6 for operator SelectCepOperator (1/1).
>>>                 ... 6 more
>>>             Caused by: java.util.concurrent.ExecutionException:
>>>             java.lang.NullPointerException
>>>                 at
>>>             java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>                 at
>>>             java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>                 at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>                 at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>                 ... 5 more
>>>                 Suppressed: java.lang.Exception: Could not properly
>>>             cancel managed keyed state future.
>>>                     at
>>>             org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>                     ... 5 more
>>>                 Caused by: java.util.concurrent.ExecutionException:
>>>             java.lang.NullPointerException
>>>                     at
>>>             java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>                     at
>>>             java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>                     at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>                     at
>>>             org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>                     at
>>>             org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>                     ... 7 more
>>>                 Caused by: java.lang.NullPointerException
>>>                     at
>>>             org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>                     at
>>>             org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>                     at
>>>             org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>                     at
>>>             org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>                     at
>>>             org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>                     at
>>>             org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>                     at
>>>             java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>                     at
>>>             org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>                     at
>>>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>                     ... 5 more
>>>                 [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>
>>>             Any ideas on why I'm hitting this especially when this
>>>             (https://issues.apache.org/jira/browse/FLINK-7756) says
>>>             it has been fixed in 1.4.2 ?
>>>
>>>             On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio
>>>             <federico.dambrosio@smartlab.ws
>>>             <ma...@smartlab.ws>> wrote:
>>>
>>>                 Thank you very much for your steady response, Kostas!
>>>
>>>                 Cheers,
>>>                 Federico
>>>
>>>                 2017-11-03 16:26 GMT+01:00 Kostas Kloudas
>>>                 <k.kloudas@data-artisans.com
>>>                 <ma...@data-artisans.com>>:
>>>
>>>                     Hi Federico,
>>>
>>>                     Thanks for trying it out! 
>>>                     Great to hear that your problem was fixed!
>>>
>>>                     The feature freeze for the release is going to
>>>                     be next week, and I would expect 1 or 2 more
>>>                     weeks testing.
>>>                     So I would say in 2.5 weeks. But this is of
>>>                     course subject to potential issues we may find
>>>                     during testing.
>>>
>>>                     Cheers,
>>>                     Kostas
>>>
>>>>                     On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio
>>>>                     <federico.dambrosio@smartlab.ws
>>>>                     <ma...@smartlab.ws>> wrote:
>>>>
>>>>                     Hi Kostas,
>>>>
>>>>                     I just tried running the same job with
>>>>                     1.4-SNAPSHOT for 10 minutes and it didn't
>>>>                     crash, so that was the same underlying issue of
>>>>                     the JIRA you linked.
>>>>
>>>>                     Do you happen to know when it's expected the
>>>>                     1.4 stable release?
>>>>
>>>>                     Thank you very much,
>>>>                     Federico
>>>>
>>>>                     2017-11-03 15:25 GMT+01:00 Kostas Kloudas
>>>>                     <k.kloudas@data-artisans.com
>>>>                     <ma...@data-artisans.com>>:
>>>>
>>>>                         Perfect! thanks a lot!
>>>>
>>>>                         Kostas
>>>>
>>>>>                         On Nov 3, 2017, at 3:23 PM, Federico
>>>>>                         D'Ambrosio <federico.dambrosio@smartlab.ws
>>>>>                         <ma...@smartlab.ws>>
>>>>>                         wrote:
>>>>>
>>>>>                         Hi Kostas,
>>>>>
>>>>>                         yes, I'm using 1.3.2. I'll try the current
>>>>>                         master and I'll get back to you.
>>>>>
>>>>>                         2017-11-03 15:21 GMT+01:00 Kostas Kloudas
>>>>>                         <k.kloudas@data-artisans.com
>>>>>                         <ma...@data-artisans.com>>:
>>>>>
>>>>>                             Hi Federico,
>>>>>
>>>>>                             I assume that you are using Flink 1.3,
>>>>>                             right?
>>>>>
>>>>>                             In this case, in 1.4 we have fixed a
>>>>>                             bug that seems similar to your case:
>>>>>                             https://issues.apache.org/jira/browse/FLINK-7756
>>>>>
>>>>>                             Could you try the current master to
>>>>>                             see if it fixes your problem?
>>>>>
>>>>>                             Thanks,
>>>>>                             Kostas
>>>>>
>>>>>>                             On Nov 3, 2017, at 3:12 PM, Federico
>>>>>>                             D'Ambrosio
>>>>>>                             <federico.dambrosio@smartlab.ws
>>>>>>                             <ma...@smartlab.ws>>
>>>>>>                             wrote:
>>>>>>
>>>>>>                              Could not find id for
>>>>>>                             entry:                                                        
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                         -- 
>>>>>                         Federico D'Ambrosio
>>>>
>>>>
>>>>
>>>>
>>>>                     -- 
>>>>                     Federico D'Ambrosio
>>>
>>>
>>>
>>>
>>>                 -- 
>>>                 Federico D'Ambrosio
>>>
>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi Dawid,

I've upgraded to flink 1.6.1 and rebased by changes against the tag 1.6.1,
the only commit on top of 1.6 is this:
https://github.com/jainshailesh/flink/commit/797e3c4af5b28263fd98fb79daaba97cabf3392c

I ran two separate identical jobs (with and without checkpointing enabled),
I'm hitting a ArrayIndexOutOfBoundsException (and sometimes NPE) *only when
checkpointing (HDFS backend) is enabled*, with the below stack trace.

I did see a similar problem with different operators here (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/IndexOutOfBoundsException-on-deserialization-after-updating-to-1-6-1-td23933.html).
Is this a known issue which is getting addressed?

Any ideas on what could be causing this?

Thanks,
Shailesh


2018-10-24 17:04:13,365 INFO
org.apache.flink.runtime.taskmanager.Task                     -
SelectCepOperatorMixedTime (1/1) - SelectCepOperatorMixedTime (1/1)
(3d984b7919342a3886593401088ca2cd) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure happened in filter
function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.WrappingRuntimeException:
java.lang.ArrayIndexOutOfBoundsException: -1
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
        at
org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
        at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
        at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
        at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
        at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
        at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
        at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
        at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
        at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
        at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
        ... 18 more

On Fri, Sep 28, 2018 at 11:00 AM Shailesh Jain <sh...@stellapps.com>
wrote:

> Hi Dawid,
>
> Thanks for your time on this. The diff should have pointed out only the
> top 3 commits, but since it did not, it is possible I did not rebase my
> branch against 1.4.2 correctly. I'll check this out and get back to you if
> I hit the same issue again.
>
> Thanks again,
> Shailesh
>
> On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Shailesh,
>>
>> I am afraid it is gonna be hard to help you, as this branch differs
>> significantly from 1.4.2 release (I've done diff across your branch and
>> tag/release-1.4.2). Moreover the code in the branch you've provided still
>> does not correspond to the lines in the exception you've posted previously.
>> Could you check if the problem occurs on vanilla flink as well?
>>
>> Best,
>>
>> Dawid
>>
>> On 27/09/18 08:22, Shailesh Jain wrote:
>>
>> Hi Dawid,
>>
>> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
>> couple of changes in the CEP operator specifically (top 3 commits here:
>> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes
>> I've made to CEP operators do not touch the checkpointing path, just
>> overloading the operator for a specific way of handling event time.
>>
>> We are hitting this in production, so I'm not sure it'll be feasible to
>> move to 1.6.0 immediately, but eventually yes.
>>
>> Thanks,
>> Shailesh
>>
>> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
>> wrote:
>>
>>> Hi Shailesh,
>>>
>>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
>>> have you introduced some changes? I am asking cause the lines in stacktrace
>>> does not align with the source code for 1.4.2.
>>>
>>> Also it is a different exception than the one in the issue you've
>>> linked, so if it is a problem than it is definitely a different one. Last
>>> thing I would recommend upgrading to the newest version, as we rewritten
>>> the SharedBuffer implementation in 1.6.0.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 26/09/18 13:50, Shailesh Jain wrote:
>>>
>>> Hi,
>>>
>>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>>> using HDFS (2.8.4) as state backend.
>>>
>>> 2018-09-26 17:07:39,370 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>>> to fail task externally SelectCepOperator (1/1)
>>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>> 2018-09-26 17:07:39,370 INFO
>>> org.apache.flink.runtime.taskmanager.Task                     -
>>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>>     at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>>> operator SelectCepOperator (1/1).
>>>     ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.NullPointerException
>>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>     at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>     at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>     ... 5 more
>>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>>> keyed state future.
>>>         at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>>         ... 5 more
>>>     Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.NullPointerException
>>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>         at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>>         at
>>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>>         at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>>         ... 7 more
>>>     Caused by: java.lang.NullPointerException
>>>         at
>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>>         at
>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>>         at
>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>>         at
>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>>         at
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>>         at
>>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>         at
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>>         ... 5 more
>>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>>
>>> Any ideas on why I'm hitting this especially when this (
>>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been
>>> fixed in 1.4.2 ?
>>>
>>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws> wrote:
>>>
>>>> Thank you very much for your steady response, Kostas!
>>>>
>>>> Cheers,
>>>> Federico
>>>>
>>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi Federico,
>>>>>
>>>>> Thanks for trying it out!
>>>>> Great to hear that your problem was fixed!
>>>>>
>>>>> The feature freeze for the release is going to be next week, and I
>>>>> would expect 1 or 2 more weeks testing.
>>>>> So I would say in 2.5 weeks. But this is of course subject to
>>>>> potential issues we may find during testing.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>> Hi Kostas,
>>>>>
>>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and
>>>>> it didn't crash, so that was the same underlying issue of the JIRA you
>>>>> linked.
>>>>>
>>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>>
>>>>> Thank you very much,
>>>>> Federico
>>>>>
>>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Perfect! thanks a lot!
>>>>>>
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>> Hi Kostas,
>>>>>>
>>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back
>>>>>> to you.
>>>>>>
>>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <
>>>>>> k.kloudas@data-artisans.com>:
>>>>>>
>>>>>>> Hi Federico,
>>>>>>>
>>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>>
>>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>>> case:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>>
>>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>>
>>>>>>>  Could not find id for
>>>>>>> entry:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Federico D'Ambrosio
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>
>>>
>>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi Dawid,

Thanks for your time on this. The diff should have pointed out only the top
3 commits, but since it did not, it is possible I did not rebase my branch
against 1.4.2 correctly. I'll check this out and get back to you if I hit
the same issue again.

Thanks again,
Shailesh

On Thu, Sep 27, 2018 at 1:00 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Shailesh,
>
> I am afraid it is gonna be hard to help you, as this branch differs
> significantly from 1.4.2 release (I've done diff across your branch and
> tag/release-1.4.2). Moreover the code in the branch you've provided still
> does not correspond to the lines in the exception you've posted previously.
> Could you check if the problem occurs on vanilla flink as well?
>
> Best,
>
> Dawid
>
> On 27/09/18 08:22, Shailesh Jain wrote:
>
> Hi Dawid,
>
> Yes, it is version 1.4.2. We are running vanilla flink, but have added a
> couple of changes in the CEP operator specifically (top 3 commits here:
> https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've
> made to CEP operators do not touch the checkpointing path, just overloading
> the operator for a specific way of handling event time.
>
> We are hitting this in production, so I'm not sure it'll be feasible to
> move to 1.6.0 immediately, but eventually yes.
>
> Thanks,
> Shailesh
>
> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Shailesh,
>>
>> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
>> have you introduced some changes? I am asking cause the lines in stacktrace
>> does not align with the source code for 1.4.2.
>>
>> Also it is a different exception than the one in the issue you've linked,
>> so if it is a problem than it is definitely a different one. Last thing I
>> would recommend upgrading to the newest version, as we rewritten the
>> SharedBuffer implementation in 1.6.0.
>>
>> Best,
>>
>> Dawid
>>
>> On 26/09/18 13:50, Shailesh Jain wrote:
>>
>> Hi,
>>
>> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
>> using HDFS (2.8.4) as state backend.
>>
>> 2018-09-26 17:07:39,370 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Attempting
>> to fail task externally SelectCepOperator (1/1)
>> (3bec4aa1ef2226c4e0c5ff7b3860d340).
>> 2018-09-26 17:07:39,370 INFO
>> org.apache.flink.runtime.taskmanager.Task                     -
>> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
>> RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 6 for operator SelectCepOperator (1/1).}
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
>> operator SelectCepOperator (1/1).
>>     ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>     at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>     ... 5 more
>>     Suppressed: java.lang.Exception: Could not properly cancel managed
>> keyed state future.
>>         at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>         ... 5 more
>>     Caused by: java.util.concurrent.ExecutionException:
>> java.lang.NullPointerException
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>         at
>> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>         at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>         ... 7 more
>>     Caused by: java.lang.NullPointerException
>>         at
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>         at
>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>         at
>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>         at
>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>         at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>         at
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>         at
>> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>         ... 5 more
>>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>
>> Any ideas on why I'm hitting this especially when this (
>> https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed
>> in 1.4.2 ?
>>
>> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>>> Thank you very much for your steady response, Kostas!
>>>
>>> Cheers,
>>> Federico
>>>
>>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>>
>>>> Hi Federico,
>>>>
>>>> Thanks for trying it out!
>>>> Great to hear that your problem was fixed!
>>>>
>>>> The feature freeze for the release is going to be next week, and I
>>>> would expect 1 or 2 more weeks testing.
>>>> So I would say in 2.5 weeks. But this is of course subject to potential
>>>> issues we may find during testing.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>> Hi Kostas,
>>>>
>>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and
>>>> it didn't crash, so that was the same underlying issue of the JIRA you
>>>> linked.
>>>>
>>>> Do you happen to know when it's expected the 1.4 stable release?
>>>>
>>>> Thank you very much,
>>>> Federico
>>>>
>>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>
>>>> :
>>>>
>>>>> Perfect! thanks a lot!
>>>>>
>>>>> Kostas
>>>>>
>>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>> Hi Kostas,
>>>>>
>>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>>>>> you.
>>>>>
>>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com
>>>>> >:
>>>>>
>>>>>> Hi Federico,
>>>>>>
>>>>>> I assume that you are using Flink 1.3, right?
>>>>>>
>>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>>> case:
>>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>>
>>>>>> Could you try the current master to see if it fixes your problem?
>>>>>>
>>>>>> Thanks,
>>>>>> Kostas
>>>>>>
>>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>>
>>>>>>  Could not find id for
>>>>>> entry:
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Federico D'Ambrosio
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>
>>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Shailesh,

I am afraid it is gonna be hard to help you, as this branch differs
significantly from 1.4.2 release (I've done diff across your branch and
tag/release-1.4.2). Moreover the code in the branch you've provided
still does not correspond to the lines in the exception you've posted
previously. Could you check if the problem occurs on vanilla flink as well?

Best,

Dawid


On 27/09/18 08:22, Shailesh Jain wrote:
> Hi Dawid,
>
> Yes, it is version 1.4.2. We are running vanilla flink, but have added
> a couple of changes in the CEP operator specifically (top 3 commits
> here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
> Changes I've made to CEP operators do not touch the checkpointing
> path, just overloading the operator for a specific way of handling
> event time.
>
> We are hitting this in production, so I'm not sure it'll be feasible
> to move to 1.6.0 immediately, but eventually yes.
>
> Thanks,
> Shailesh
>
> On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Hi Shailesh,
>
>     Are you sure you are using version 1.4.2? Do you run a vanilla
>     flink, or have you introduced some changes? I am asking cause the
>     lines in stacktrace does not align with the source code for 1.4.2.
>
>     Also it is a different exception than the one in the issue you've
>     linked, so if it is a problem than it is definitely a different
>     one. Last thing I would recommend upgrading to the newest version,
>     as we rewritten the SharedBuffer implementation in 1.6.0.
>
>     Best,
>
>     Dawid
>
>
>     On 26/09/18 13:50, Shailesh Jain wrote:
>>     Hi,
>>
>>     I think I've hit this same issue on a 3 node standalone cluster
>>     (1.4.2) using HDFS (2.8.4) as state backend.
>>
>>     2018-09-26 17:07:39,370 INFO 
>>     org.apache.flink.runtime.taskmanager.Task                     -
>>     Attempting to fail task externally SelectCepOperator (1/1)
>>     (3bec4aa1ef2226c4e0c5ff7b3860d340).
>>     2018-09-26 17:07:39,370 INFO 
>>     org.apache.flink.runtime.taskmanager.Task                     -
>>     SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340)
>>     switched from RUNNING to FAILED.
>>     AsynchronousException{java.lang.Exception: Could not materialize
>>     checkpoint 6 for operator SelectCepOperator (1/1).}
>>         at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>>         at
>>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at
>>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>>     Caused by: java.lang.Exception: Could not materialize checkpoint
>>     6 for operator SelectCepOperator (1/1).
>>         ... 6 more
>>     Caused by: java.util.concurrent.ExecutionException:
>>     java.lang.NullPointerException
>>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>         at
>>     org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>         at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>         ... 5 more
>>         Suppressed: java.lang.Exception: Could not properly cancel
>>     managed keyed state future.
>>             at
>>     org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>>             at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>>             at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>>             ... 5 more
>>         Caused by: java.util.concurrent.ExecutionException:
>>     java.lang.NullPointerException
>>             at
>>     java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>             at
>>     org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>             at
>>     org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>>             at
>>     org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>>             ... 7 more
>>         Caused by: java.lang.NullPointerException
>>             at
>>     org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>>             at
>>     org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>>             at
>>     org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>>             at
>>     org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>>             at
>>     org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>>             at
>>     org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>>             at
>>     org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>>             at
>>     org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>             at
>>     org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>>             at
>>     org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>>             ... 5 more
>>         [CIRCULAR REFERENCE:java.lang.NullPointerException]
>>
>>     Any ideas on why I'm hitting this especially when this
>>     (https://issues.apache.org/jira/browse/FLINK-7756) says it has
>>     been fixed in 1.4.2 ?
>>
>>     On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio
>>     <federico.dambrosio@smartlab.ws
>>     <ma...@smartlab.ws>> wrote:
>>
>>         Thank you very much for your steady response, Kostas!
>>
>>         Cheers,
>>         Federico
>>
>>         2017-11-03 16:26 GMT+01:00 Kostas Kloudas
>>         <k.kloudas@data-artisans.com
>>         <ma...@data-artisans.com>>:
>>
>>             Hi Federico,
>>
>>             Thanks for trying it out! 
>>             Great to hear that your problem was fixed!
>>
>>             The feature freeze for the release is going to be next
>>             week, and I would expect 1 or 2 more weeks testing.
>>             So I would say in 2.5 weeks. But this is of course
>>             subject to potential issues we may find during testing.
>>
>>             Cheers,
>>             Kostas
>>
>>>             On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio
>>>             <federico.dambrosio@smartlab.ws
>>>             <ma...@smartlab.ws>> wrote:
>>>
>>>             Hi Kostas,
>>>
>>>             I just tried running the same job with 1.4-SNAPSHOT for
>>>             10 minutes and it didn't crash, so that was the same
>>>             underlying issue of the JIRA you linked.
>>>
>>>             Do you happen to know when it's expected the 1.4 stable
>>>             release?
>>>
>>>             Thank you very much,
>>>             Federico
>>>
>>>             2017-11-03 15:25 GMT+01:00 Kostas Kloudas
>>>             <k.kloudas@data-artisans.com
>>>             <ma...@data-artisans.com>>:
>>>
>>>                 Perfect! thanks a lot!
>>>
>>>                 Kostas
>>>
>>>>                 On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio
>>>>                 <federico.dambrosio@smartlab.ws
>>>>                 <ma...@smartlab.ws>> wrote:
>>>>
>>>>                 Hi Kostas,
>>>>
>>>>                 yes, I'm using 1.3.2. I'll try the current master
>>>>                 and I'll get back to you.
>>>>
>>>>                 2017-11-03 15:21 GMT+01:00 Kostas Kloudas
>>>>                 <k.kloudas@data-artisans.com
>>>>                 <ma...@data-artisans.com>>:
>>>>
>>>>                     Hi Federico,
>>>>
>>>>                     I assume that you are using Flink 1.3, right?
>>>>
>>>>                     In this case, in 1.4 we have fixed a bug that
>>>>                     seems similar to your case:
>>>>                     https://issues.apache.org/jira/browse/FLINK-7756
>>>>
>>>>                     Could you try the current master to see if it
>>>>                     fixes your problem?
>>>>
>>>>                     Thanks,
>>>>                     Kostas
>>>>
>>>>>                     On Nov 3, 2017, at 3:12 PM, Federico
>>>>>                     D'Ambrosio <federico.dambrosio@smartlab.ws
>>>>>                     <ma...@smartlab.ws>> wrote:
>>>>>
>>>>>                      Could not find id for
>>>>>                     entry:                                                        
>>>>
>>>>
>>>>
>>>>
>>>>                 -- 
>>>>                 Federico D'Ambrosio
>>>
>>>
>>>
>>>
>>>             -- 
>>>             Federico D'Ambrosio
>>
>>
>>
>>
>>         -- 
>>         Federico D'Ambrosio
>>
>


Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi Dawid,

Yes, it is version 1.4.2. We are running vanilla flink, but have added a
couple of changes in the CEP operator specifically (top 3 commits here:
https://github.com/jainshailesh/flink/commits/poc_on_1.4.2). Changes I've
made to CEP operators do not touch the checkpointing path, just overloading
the operator for a specific way of handling event time.

We are hitting this in production, so I'm not sure it'll be feasible to
move to 1.6.0 immediately, but eventually yes.

Thanks,
Shailesh

On Wed, Sep 26, 2018 at 5:44 PM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Shailesh,
>
> Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
> have you introduced some changes? I am asking cause the lines in stacktrace
> does not align with the source code for 1.4.2.
>
> Also it is a different exception than the one in the issue you've linked,
> so if it is a problem than it is definitely a different one. Last thing I
> would recommend upgrading to the newest version, as we rewritten the
> SharedBuffer implementation in 1.6.0.
>
> Best,
>
> Dawid
>
> On 26/09/18 13:50, Shailesh Jain wrote:
>
> Hi,
>
> I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
> using HDFS (2.8.4) as state backend.
>
> 2018-09-26 17:07:39,370 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Attempting
> to fail task externally SelectCepOperator (1/1)
> (3bec4aa1ef2226c4e0c5ff7b3860d340).
> 2018-09-26 17:07:39,370 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 6 for operator SelectCepOperator (1/1).}
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
> operator SelectCepOperator (1/1).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>     ... 5 more
>     Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed state future.
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>         ... 5 more
>     Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>         ... 7 more
>     Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>         at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>         at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>         at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>         at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>         at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>         at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>         ... 5 more
>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>
> Any ideas on why I'm hitting this especially when this (
> https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed
> in 1.4.2 ?
>
> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
>> Thank you very much for your steady response, Kostas!
>>
>> Cheers,
>> Federico
>>
>> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>
>>> Hi Federico,
>>>
>>> Thanks for trying it out!
>>> Great to hear that your problem was fixed!
>>>
>>> The feature freeze for the release is going to be next week, and I would
>>> expect 1 or 2 more weeks testing.
>>> So I would say in 2.5 weeks. But this is of course subject to potential
>>> issues we may find during testing.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws> wrote:
>>>
>>> Hi Kostas,
>>>
>>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and
>>> it didn't crash, so that was the same underlying issue of the JIRA you
>>> linked.
>>>
>>> Do you happen to know when it's expected the 1.4 stable release?
>>>
>>> Thank you very much,
>>> Federico
>>>
>>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>>
>>>> Perfect! thanks a lot!
>>>>
>>>> Kostas
>>>>
>>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>> Hi Kostas,
>>>>
>>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>>>> you.
>>>>
>>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>
>>>> :
>>>>
>>>>> Hi Federico,
>>>>>
>>>>> I assume that you are using Flink 1.3, right?
>>>>>
>>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>>> case:
>>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>>
>>>>> Could you try the current master to see if it fixes your problem?
>>>>>
>>>>> Thanks,
>>>>> Kostas
>>>>>
>>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>>
>>>>>  Could not find id for
>>>>> entry:
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Federico D'Ambrosio
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Shailesh,

Are you sure you are using version 1.4.2? Do you run a vanilla flink, or
have you introduced some changes? I am asking cause the lines in
stacktrace does not align with the source code for 1.4.2.

Also it is a different exception than the one in the issue you've
linked, so if it is a problem than it is definitely a different one.
Last thing I would recommend upgrading to the newest version, as we
rewritten the SharedBuffer implementation in 1.6.0.

Best,

Dawid


On 26/09/18 13:50, Shailesh Jain wrote:
> Hi,
>
> I think I've hit this same issue on a 3 node standalone cluster
> (1.4.2) using HDFS (2.8.4) as state backend.
>
> 2018-09-26 17:07:39,370 INFO 
> org.apache.flink.runtime.taskmanager.Task                     -
> Attempting to fail task externally SelectCepOperator (1/1)
> (3bec4aa1ef2226c4e0c5ff7b3860d340).
> 2018-09-26 17:07:39,370 INFO 
> org.apache.flink.runtime.taskmanager.Task                     -
> SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 6 for operator SelectCepOperator (1/1).}
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
> operator SelectCepOperator (1/1).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>     ... 5 more
>     Suppressed: java.lang.Exception: Could not properly cancel managed
> keyed state future.
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
>         ... 5 more
>     Caused by: java.util.concurrent.ExecutionException:
> java.lang.NullPointerException
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
>         at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
>         ... 7 more
>     Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
>         at
> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
>         at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
>         at
> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
>         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
>         at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
>         at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
>         at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
>         ... 5 more
>     [CIRCULAR REFERENCE:java.lang.NullPointerException]
>
> Any ideas on why I'm hitting this especially when this
> (https://issues.apache.org/jira/browse/FLINK-7756) says it has been
> fixed in 1.4.2 ?
>
> On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio
> <federico.dambrosio@smartlab.ws
> <ma...@smartlab.ws>> wrote:
>
>     Thank you very much for your steady response, Kostas!
>
>     Cheers,
>     Federico
>
>     2017-11-03 16:26 GMT+01:00 Kostas Kloudas
>     <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>
>         Hi Federico,
>
>         Thanks for trying it out! 
>         Great to hear that your problem was fixed!
>
>         The feature freeze for the release is going to be next week,
>         and I would expect 1 or 2 more weeks testing.
>         So I would say in 2.5 weeks. But this is of course subject to
>         potential issues we may find during testing.
>
>         Cheers,
>         Kostas
>
>>         On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio
>>         <federico.dambrosio@smartlab.ws
>>         <ma...@smartlab.ws>> wrote:
>>
>>         Hi Kostas,
>>
>>         I just tried running the same job with 1.4-SNAPSHOT for 10
>>         minutes and it didn't crash, so that was the same underlying
>>         issue of the JIRA you linked.
>>
>>         Do you happen to know when it's expected the 1.4 stable release?
>>
>>         Thank you very much,
>>         Federico
>>
>>         2017-11-03 15:25 GMT+01:00 Kostas Kloudas
>>         <k.kloudas@data-artisans.com
>>         <ma...@data-artisans.com>>:
>>
>>             Perfect! thanks a lot!
>>
>>             Kostas
>>
>>>             On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio
>>>             <federico.dambrosio@smartlab.ws
>>>             <ma...@smartlab.ws>> wrote:
>>>
>>>             Hi Kostas,
>>>
>>>             yes, I'm using 1.3.2. I'll try the current master and
>>>             I'll get back to you.
>>>
>>>             2017-11-03 15:21 GMT+01:00 Kostas Kloudas
>>>             <k.kloudas@data-artisans.com
>>>             <ma...@data-artisans.com>>:
>>>
>>>                 Hi Federico,
>>>
>>>                 I assume that you are using Flink 1.3, right?
>>>
>>>                 In this case, in 1.4 we have fixed a bug that seems
>>>                 similar to your case:
>>>                 https://issues.apache.org/jira/browse/FLINK-7756
>>>
>>>                 Could you try the current master to see if it fixes
>>>                 your problem?
>>>
>>>                 Thanks,
>>>                 Kostas
>>>
>>>>                 On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio
>>>>                 <federico.dambrosio@smartlab.ws
>>>>                 <ma...@smartlab.ws>> wrote:
>>>>
>>>>                  Could not find id for
>>>>                 entry:                                                        
>>>
>>>
>>>
>>>
>>>             -- 
>>>             Federico D'Ambrosio
>>
>>
>>
>>
>>         -- 
>>         Federico D'Ambrosio
>
>
>
>
>     -- 
>     Federico D'Ambrosio
>


Re: FlinkCEP, circular references and checkpointing failures

Posted by Shailesh Jain <sh...@stellapps.com>.
Hi,

I think I've hit this same issue on a 3 node standalone cluster (1.4.2)
using HDFS (2.8.4) as state backend.

2018-09-26 17:07:39,370 INFO
org.apache.flink.runtime.taskmanager.Task                     - Attempting
to fail task externally SelectCepOperator (1/1)
(3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26 17:07:39,370 INFO
org.apache.flink.runtime.taskmanager.Task                     -
SelectCepOperator (1/1) (3bec4aa1ef2226c4e0c5ff7b3860d340) switched from
RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
6 for operator SelectCepOperator (1/1).}
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for
operator SelectCepOperator (1/1).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
    ... 5 more
    Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
        ... 5 more
    Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
        ... 7 more
    Caused by: java.lang.NullPointerException
        at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
        at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
        at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
        at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
        at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
        ... 5 more
    [CIRCULAR REFERENCE:java.lang.NullPointerException]

Any ideas on why I'm hitting this especially when this (
https://issues.apache.org/jira/browse/FLINK-7756) says it has been fixed in
1.4.2 ?

On Sat, Nov 4, 2017 at 12:34 AM Federico D'Ambrosio <
federico.dambrosio@smartlab.ws> wrote:

> Thank you very much for your steady response, Kostas!
>
> Cheers,
> Federico
>
> 2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>
>> Hi Federico,
>>
>> Thanks for trying it out!
>> Great to hear that your problem was fixed!
>>
>> The feature freeze for the release is going to be next week, and I would
>> expect 1 or 2 more weeks testing.
>> So I would say in 2.5 weeks. But this is of course subject to potential
>> issues we may find during testing.
>>
>> Cheers,
>> Kostas
>>
>> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>> Hi Kostas,
>>
>> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
>> didn't crash, so that was the same underlying issue of the JIRA you linked.
>>
>> Do you happen to know when it's expected the 1.4 stable release?
>>
>> Thank you very much,
>> Federico
>>
>> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>
>>> Perfect! thanks a lot!
>>>
>>> Kostas
>>>
>>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws> wrote:
>>>
>>> Hi Kostas,
>>>
>>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>>> you.
>>>
>>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>>
>>>> Hi Federico,
>>>>
>>>> I assume that you are using Flink 1.3, right?
>>>>
>>>> In this case, in 1.4 we have fixed a bug that seems similar to your
>>>> case:
>>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>>
>>>> Could you try the current master to see if it fixes your problem?
>>>>
>>>> Thanks,
>>>> Kostas
>>>>
>>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>>> federico.dambrosio@smartlab.ws> wrote:
>>>>
>>>>  Could not find id for
>>>> entry:
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Federico D'Ambrosio
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>

Re: FlinkCEP, circular references and checkpointing failures

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:

> Hi Federico,
>
> Thanks for trying it out!
> Great to hear that your problem was fixed!
>
> The feature freeze for the release is going to be next week, and I would
> expect 1 or 2 more weeks testing.
> So I would say in 2.5 weeks. But this is of course subject to potential
> issues we may find during testing.
>
> Cheers,
> Kostas
>
> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
> Hi Kostas,
>
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
> didn't crash, so that was the same underlying issue of the JIRA you linked.
>
> Do you happen to know when it's expected the 1.4 stable release?
>
> Thank you very much,
> Federico
>
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>
>> Perfect! thanks a lot!
>>
>> Kostas
>>
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>> Hi Kostas,
>>
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>> you.
>>
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>>
>>> Hi Federico,
>>>
>>> I assume that you are using Flink 1.3, right?
>>>
>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>
>>> Could you try the current master to see if it fixes your problem?
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>> federico.dambrosio@smartlab.ws> wrote:
>>>
>>>  Could not find id for entry:
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio

Re: FlinkCEP, circular references and checkpointing failures

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Federico,

Thanks for trying it out! 
Great to hear that your problem was fixed!

The feature freeze for the release is going to be next week, and I would expect 1 or 2 more weeks testing.
So I would say in 2.5 weeks. But this is of course subject to potential issues we may find during testing.

Cheers,
Kostas

> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> Hi Kostas,
> 
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it didn't crash, so that was the same underlying issue of the JIRA you linked.
> 
> Do you happen to know when it's expected the 1.4 stable release?
> 
> Thank you very much,
> Federico
> 
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
> Perfect! thanks a lot!
> 
> Kostas
> 
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>> 
>> Hi Kostas, 
>> 
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>> 
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
>> Hi Federico,
>> 
>> I assume that you are using Flink 1.3, right?
>> 
>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
>> 
>> Could you try the current master to see if it fixes your problem?
>> 
>> Thanks,
>> Kostas
>> 
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>>> 
>>>  Could not find id for entry:                                                        
>> 
>> 
>> 
>> 
>> -- 
>> Federico D'Ambrosio
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi Kostas,

I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
didn't crash, so that was the same underlying issue of the JIRA you linked.

Do you happen to know when it's expected the 1.4 stable release?

Thank you very much,
Federico

2017-11-03 15:25 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:

> Perfect! thanks a lot!
>
> Kostas
>
> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
> Hi Kostas,
>
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
>
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:
>
>> Hi Federico,
>>
>> I assume that you are using Flink 1.3, right?
>>
>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>> https://issues.apache.org/jira/browse/FLINK-7756
>>
>> Could you try the current master to see if it fixes your problem?
>>
>> Thanks,
>> Kostas
>>
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>> federico.dambrosio@smartlab.ws> wrote:
>>
>>  Could not find id for entry:
>>
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio

Re: FlinkCEP, circular references and checkpointing failures

Posted by Kostas Kloudas <k....@data-artisans.com>.
Perfect! thanks a lot!

Kostas

> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
> Hi Kostas, 
> 
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
> 
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>>:
> Hi Federico,
> 
> I assume that you are using Flink 1.3, right?
> 
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>
> 
> Could you try the current master to see if it fixes your problem?
> 
> Thanks,
> Kostas
> 
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <federico.dambrosio@smartlab.ws <ma...@smartlab.ws>> wrote:
>> 
>>  Could not find id for entry:                                                        
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

Posted by Federico D'Ambrosio <fe...@smartlab.ws>.
Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas <k....@data-artisans.com>:

> Hi Federico,
>
> I assume that you are using Flink 1.3, right?
>
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Could you try the current master to see if it fixes your problem?
>
> Thanks,
> Kostas
>
> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
> federico.dambrosio@smartlab.ws> wrote:
>
>  Could not find id for entry:
>
>
>
>


-- 
Federico D'Ambrosio

Re: FlinkCEP, circular references and checkpointing failures

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:
https://issues.apache.org/jira/browse/FLINK-7756 <https://issues.apache.org/jira/browse/FLINK-7756>

Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <fe...@smartlab.ws> wrote:
> 
>  Could not find id for entry: