You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/09/15 03:08:46 UTC

StreamCorruptedException

I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink
cluster up and have been running my flink CEP job for more than 3 hours
when I see the following exception :

The messages consumed from Kafka are protobuf messages and I use a protobuf
serializer. i have no clue as to where is this exception coming from. Can
someone help?



java.lang.IllegalStateException: Could not initialize keyed state backend.
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
    at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828)
    at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862)
    at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764)
    at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196)
    at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203)
    at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839)
    at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473)
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
    ... 6 more

Re: StreamCorruptedException

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Sridhar,

From looking at your code:

1) The “KafkaDataSource” is a custom source that you implemented? Does this source buffer anything?
2) The getStreamSource2 seems to return again a "new KafkaDataSource<MyMessage1>”. Can this be a problem?
3) You are working on processing time and you are simply detecting if 2 messages of the same type came within 15min right? 
	I suppose that this could also be implemented using the times() quantifier, but this is just a matter of taste.
	Could you reduce this to a smaller duration and see if you still get a corrupted stream exception?

Thanks,
Kostas

> On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa <fl...@gmail.com> wrote:
> 
> One more point to add.
> 
> I disabled checkpoints (by commenting out code that calls enableCheckpointing()) and re-ran the job this time with plenty of memory to the job manager
> 
> ~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d
> 
> At the Jobmanager, I am still hitting:
> 
> 2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Starting YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2, Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
> 2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Current user: flink
> 2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Maximum heap size: 16384 MiBytes
> 2017-09-25 06:46:44,066 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JAVA_HOME: /usr/lib/jvm/java-8-openjdk-amd64
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Hadoop version: 2.7.2
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM Options:
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Xmx18432m
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlogback.configurationFile=file:logback.xml
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -     -Dlog4j.configuration=file:log4j.properties
> 2017-09-25 06:46:44,067 INFO  org.apache.flink.yarn.YarnApplicationMasterRunner             -  Program Arguments: (none)
> 
>                                                                                  .
>                                                                                  .
> 
> 2017-09-25 06:50:51,925 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from DEPLOYING to RUNNING.
> 2017-09-25 13:38:54,175 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
> 2017-09-25 13:38:54,187 INFO  org.apache.flink.runtime.blob.BlobCache                       - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from localhost/127.0.0.1:55550 <http://127.0.0.1:55550/>
> 2017-09-25 16:30:39,974 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a) switched from RUNNING to CANCELED.
> 2017-09-25 16:30:39,975 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6) switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>         at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
>         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
>         at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
>         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>         at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 
> 
> On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <flinkenthu@gmail.com <ma...@gmail.com>> wrote:
> Here is the snippet :
> 
> public interface Rule {
>     DataStream<Alert> run();
> }
> 
> public class Rule1 implements Rule {
> 
>     private static final String RULE_ID = "Rule1"
> 
>     @Override
>     public DataStream<Alert> run() {
> 
> 
>         Pattern<MyMessage1, ?> MyMessage1Pattern =
>                 Pattern.<MyMessage1>begin("first").
>                         subtype(MyMessage1.class).
>                         next("second").
>                         subtype(MyMessage1.class).
>                         within(Time.minutes(15);
> 
>         PatternStream<MyMessage1> MyMessage1PatternStream =
>                 CEP.pattern(
>                         MyMessage1DataStream.keyBy("field1", "field2"),
>                         MyMessage1Pattern
>                 );
> 
>        return (MyMessage1PatternStream.select(
>                 new PatternSelectFunction<MyMessage1, Alert>() {
>                     @Override
>                     public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception {
> 
>                         String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID);
> 
>                         return new CEPAlert(alertMessage);
>                     }
>                 }
>             )
>         );
> 
>     }
> 
> 
> 
>     private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
>         List<Rule> rules = new ArrayList<Rule>();
> 
>         rules.add(new Rule1(MyMessage1DataStream));
> 
>         return rules;
>     }
> 
>     
>     private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
>         List<Rule> rules = new ArrayList<Rule>();
> 
>         rules.add(new Rule2(MyMessage1DataStream));
>         return rules;
>     }
> 
> 
>    public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) {
> 
> 
>         env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>         env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 
>         KafkaDataSource<T> flinkCepConsumer =
>                 new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema());
> 
>         return flinkCepConsumer;
>     }
> 
> 
>    public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) {
> 
> 
>         env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL));
>         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>         env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 
>         KafkaDataSource<T> flinkCepConsumer =
>                 new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema());
> 
>         return flinkCepConsumer;
>     }
> 
> 
>     public static void main(String[] args) throws Exception {
>         ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);
> 
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
>         env.getConfig().setGlobalJobParameters(parameterTool);
> 
>         DataStream<MyMessage1> message1Stream = env.addSource(
>             getStreamSource1(env, parameterTool);
>         );
> 
> 
>         DataStream<MyMessge2> message2Stream = env.addSource(
>             getStreamSource2(env, parameterTool);
>         );
> 
> 
>         getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print());
>         getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print());
>         env.execute(STREAMING_JOB_NAME);
>     }
> 
> 
> 
> 
> 
> 
> On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>> wrote:
> I talked a bit with Kostas on what may be happening here.
> 
> It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
> Could you perhaps provide an overview / code snippet of what your CEP job is doing?
> 
> Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.
> 
> Cheers,
> Gordon
> 
> On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (flinkenthu@gmail.com <ma...@gmail.com>) wrote:
> 
>> Thanks for the reply. Well, tracing back to the root cause, I see the following:
>> 
>> 1. At the Job manager, the Checkpoint times are getting worse :
>> 
>> Jobmanager :
>> 
>> Checkpoint times are getting worse progressively.
>> 
>> 2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
>> 2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
>> 2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
>> 2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).
>> 
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>> 2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
>> 2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
>> 2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
>> 2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
>> 2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
>> 2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
>> 2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
>> 2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
>> 2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
>> 2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
>> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
>> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
>> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
>> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
>> 2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
>>     ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>>     ... 5 more
>> 
>> 
>> So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?
>> 
>>                                                          
>> 
>> 
>> 
>> 
>> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>> wrote:
>> Hi Sridhar,
>> 
>> Sorry that this didn't get a response earlier.
>> 
>> According to the trace, it seems like the job failed during the process, and
>> when trying to automatically restore from a checkpoint, deserialization of a
>> CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
>> are just using Java serialization on CEP `IterativeCondition` objects, so
>> should not be related to the protobuf serializer that you are using.
>> 
>> Is this still constantly happening for you?
>> 
>> Cheers,
>> Gordon
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
> 
> 


Re: StreamCorruptedException

Posted by Sridhar Chellappa <fl...@gmail.com>.
One more point to add.

I disabled checkpoints (by commenting out code that calls
enableCheckpointing()) and re-ran the job this time with plenty of memory
to the job manager

~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d

At the Jobmanager, I am still hitting:

2017-09-25 06:46:44,066 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  Starting
YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2,
Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
2017-09-25 06:46:44,066 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  Current
user: flink
2017-09-25 06:46:44,066 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM:
OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
2017-09-25 06:46:44,066 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  Maximum
heap size: 16384 MiBytes
2017-09-25 06:46:44,066 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  JAVA_HOME:
/usr/lib/jvm/java-8-openjdk-amd64
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  Hadoop
version: 2.7.2
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  JVM
Options:
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -
-Xmx18432m
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -
-Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -
-Dlogback.configurationFile=file:logback.xml
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -
-Dlog4j.configuration=file:log4j.properties
2017-09-25 06:46:44,067 INFO
org.apache.flink.yarn.YarnApplicationMasterRunner             -  Program
Arguments: (none)


.

.

2017-09-25 06:50:51,925 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map
-> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6)
switched from DEPLOYING to RUNNING.
2017-09-25 13:38:54,175 INFO
org.apache.flink.runtime.blob.BlobCache                       - Created
BLOB cache storage directory
/tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
2017-09-25 13:38:54,187 INFO
org.apache.flink.runtime.blob.BlobCache                       - Downloading
49efe0ad58b727ba145b86df6088111c9a90ddd6 from localhost/127.0.0.1:55550
2017-09-25 16:30:39,974 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a)
switched from RUNNING to CANCELED.
2017-09-25 16:30:39,975 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map
-> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6)
switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown
Source)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at
com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
        at
com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
        at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
        at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)


On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> Here is the snippet :
>
> public interface Rule {
>     DataStream<Alert> run();
> }
>
> public class Rule1 implements Rule {
>
>     private static final String RULE_ID = "Rule1"
>
>     @Override
>     public DataStream<Alert> run() {
>
>
>         Pattern<MyMessage1, ?> MyMessage1Pattern =
>                 Pattern.<MyMessage1>begin("first").
>                         subtype(MyMessage1.class).
>                         next("second").
>                         subtype(MyMessage1.class).
>                         within(Time.minutes(15);
>
>         PatternStream<MyMessage1> MyMessage1PatternStream =
>                 CEP.pattern(
>                         MyMessage1DataStream.keyBy("field1", "field2"),
>                         MyMessage1Pattern
>                 );
>
>        return (MyMessage1PatternStream.select(
>                 new PatternSelectFunction<MyMessage1, Alert>() {
>                     @Override
>                     public Alert select(Map<String, List<MyMessage1>>
> pattern) throws Exception {
>
>                         String alertMessage = String.format("Cep Alert.
> Rule ID : %s" RULE_ID);
>
>                         return new CEPAlert(alertMessage);
>                     }
>                 }
>             )
>         );
>
>     }
>
>
>
>     private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1>
> MyMessage1DataStream) {
>         List<Rule> rules = new ArrayList<Rule>();
>
>         rules.add(new Rule1(MyMessage1DataStream));
>
>         return rules;
>     }
>
>
>     private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1>
> MyMessage1DataStream) {
>         List<Rule> rules = new ArrayList<Rule>();
>
>         rules.add(new Rule2(MyMessage1DataStream));
>         return rules;
>     }
>
>
>    public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
>
>         env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
> DEFAULT_CHECKPOINT_INTERVAL));
>         env.getCheckpointConfig().setCheckpointingMode(
> CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(
> DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>         env.getCheckpointConfig().setCheckpointTimeout(
> CheckpointConfig.DEFAULT_TIMEOUT);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>
>         KafkaDataSource<T> flinkCepConsumer =
>                 new KafkaDataSource<MyMessage1>(parameterTool, new
> MyMessage1SerDeSchema());
>
>         return flinkCepConsumer;
>     }
>
>
>    public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment
> env, ParameterTool parameterTool) {
>
>
>         env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
> DEFAULT_CHECKPOINT_INTERVAL));
>         env.getCheckpointConfig().setCheckpointingMode(
> CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(
> DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>         env.getCheckpointConfig().setCheckpointTimeout(
> CheckpointConfig.DEFAULT_TIMEOUT);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>
>         KafkaDataSource<T> flinkCepConsumer =
>                 new KafkaDataSource<MyMessage1>(parameterTool, new
> MyMessage2SerDeSchema());
>
>         return flinkCepConsumer;
>     }
>
>
>     public static void main(String[] args) throws Exception {
>         ParameterTool parameterTool = ParameterTool.
> fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
>         env.getConfig().setGlobalJobParameters(parameterTool);
>
>         DataStream<MyMessage1> message1Stream = env.addSource(
>             getStreamSource1(env, parameterTool);
>         );
>
>
>         DataStream<MyMessge2> message2Stream = env.addSource(
>             getStreamSource2(env, parameterTool);
>         );
>
>
>         getStream1RulesToExecute(message1Stream).forEach(rule ->
> rule.run().print());
>         getStream2RulesToExecute(message2tream).forEach(rule ->
> rule.run().print());
>         env.execute(STREAMING_JOB_NAME);
>     }
>
>
>
>
>
>
> On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> I talked a bit with Kostas on what may be happening here.
>>
>> It could be that your patterns are not closing, which depends on the
>> pattern construction of your CEP job.
>> Could you perhaps provide an overview / code snippet of what your CEP job
>> is doing?
>>
>> Looping Kostas (in CC) also to this thread as he may have a better idea
>> what is happening here.
>>
>> Cheers,
>> Gordon
>>
>> On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (
>> flinkenthu@gmail.com) wrote:
>>
>> Thanks for the reply. Well, tracing back to the root cause, I see the
>> following:
>>
>> 1. At the Job manager, the Checkpoint times are getting worse :
>>
>> Jobmanager :
>>
>> Checkpoint times are getting worse progressively.
>>
>> 2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
>> 2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes
>> in 586 ms).
>> 2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
>> 2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes
>> in 583 ms).
>>
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>>                                                           .
>> 2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 89 (246125113
>> bytes in 27194 ms).
>> 2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 90 @
>> 1505547250809
>> 2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 90 (248272325
>> bytes in 34012 ms).
>> 2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 91 @
>> 1505547350809
>> 2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 91 (250348812
>> bytes in 46136 ms).
>> 2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 92 @
>> 1505547450809
>> 2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 92 (252399724
>> bytes in 47152 ms).
>> 2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 93 @
>> 1505547550809
>> 2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 93 (254374636
>> bytes in 62573 ms).
>> 2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 94 @
>> 1505547650809
>> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 94 (256386533
>> bytes in 111898 ms).
>> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 95 @
>> 1505547762850
>> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Completed checkpoint 95 (258441766
>> bytes in 203268 ms).
>> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.check
>> point.CheckpointCoordinator     - Triggering checkpoint 96 @
>> 1505547966241
>> 2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.execu
>> tiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4)
>> (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize
>> checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:970)
>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for
>> operator KeyedCEPPatternOperator -> Map (1/4).
>>     ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUt
>> il.java:43)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChe
>> ckpointRunnable.run(StreamTask.java:897)
>>     ... 5 more
>>
>>
>> So, it looks like the Job Manager ran out of memory, thanks to the
>> "Progressively Getting Worse" checkpoints. Any ideas on how to make sure
>> the checkpoints faster?
>>
>>
>>
>>
>>
>>
>> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> > wrote:
>>
>>> Hi Sridhar,
>>>
>>> Sorry that this didn't get a response earlier.
>>>
>>> According to the trace, it seems like the job failed during the process,
>>> and
>>> when trying to automatically restore from a checkpoint, deserialization
>>> of a
>>> CEP `IterativeCondition` object failed. As far as I can tell, CEP
>>> operators
>>> are just using Java serialization on CEP `IterativeCondition` objects, so
>>> should not be related to the protobuf serializer that you are using.
>>>
>>> Is this still constantly happening for you?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>

Re: StreamCorruptedException

Posted by Sridhar Chellappa <fl...@gmail.com>.
Here is the snippet :

public interface Rule {
    DataStream<Alert> run();
}

public class Rule1 implements Rule {

    private static final String RULE_ID = "Rule1"

    @Override
    public DataStream<Alert> run() {


        Pattern<MyMessage1, ?> MyMessage1Pattern =
                Pattern.<MyMessage1>begin("first").
                        subtype(MyMessage1.class).
                        next("second").
                        subtype(MyMessage1.class).
                        within(Time.minutes(15);

        PatternStream<MyMessage1> MyMessage1PatternStream =
                CEP.pattern(
                        MyMessage1DataStream.keyBy("field1", "field2"),
                        MyMessage1Pattern
                );

       return (MyMessage1PatternStream.select(
                new PatternSelectFunction<MyMessage1, Alert>() {
                    @Override
                    public Alert select(Map<String, List<MyMessage1>>
pattern) throws Exception {

                        String alertMessage = String.format("Cep Alert.
Rule ID : %s" RULE_ID);

                        return new CEPAlert(alertMessage);
                    }
                }
            )
        );

    }



    private static List<Rule>
getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule1(MyMessage1DataStream));

        return rules;
    }


    private static List<Rule>
getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule2(MyMessage1DataStream));
        return rules;
    }


   public RichParallelSourceFunction<MyMessage1>
getStreamSource1(StreamExecutionEnvironment env, ParameterTool
parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
DEFAULT_CHECKPOINT_INTERVAL));

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);

env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new
MyMessage1SerDeSchema());

        return flinkCepConsumer;
    }


   public RichParallelSourceFunction<MyMessage2>
getStreamSource2(StreamExecutionEnvironment env, ParameterTool
parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
DEFAULT_CHECKPOINT_INTERVAL));

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);

env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new
MyMessage2SerDeSchema());

        return flinkCepConsumer;
    }


    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool =
ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<MyMessage1> message1Stream = env.addSource(
            getStreamSource1(env, parameterTool);
        );


        DataStream<MyMessge2> message2Stream = env.addSource(
            getStreamSource2(env, parameterTool);
        );


        getStream1RulesToExecute(message1Stream).forEach(rule ->
rule.run().print());
        getStream2RulesToExecute(message2tream).forEach(rule ->
rule.run().print());
        env.execute(STREAMING_JOB_NAME);
    }






On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> I talked a bit with Kostas on what may be happening here.
>
> It could be that your patterns are not closing, which depends on the
> pattern construction of your CEP job.
> Could you perhaps provide an overview / code snippet of what your CEP job
> is doing?
>
> Looping Kostas (in CC) also to this thread as he may have a better idea
> what is happening here.
>
> Cheers,
> Gordon
>
> On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (
> flinkenthu@gmail.com) wrote:
>
> Thanks for the reply. Well, tracing back to the root cause, I see the
> following:
>
> 1. At the Job manager, the Checkpoint times are getting worse :
>
> Jobmanager :
>
> Checkpoint times are getting worse progressively.
>
> 2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1 @ 1505538350809
> 2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1 (11101233 bytes in 586 ms).
> 2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 2 @ 1505538450809
> 2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 2 (18070955 bytes in 583 ms).
>
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
> 2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 89 (246125113 bytes in 27194 ms).
> 2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 90 @ 1505547250809
> 2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 90 (248272325 bytes in 34012 ms).
> 2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 91 @ 1505547350809
> 2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 91 (250348812 bytes in 46136 ms).
> 2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 92 @ 1505547450809
> 2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 92 (252399724 bytes in 47152 ms).
> 2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 93 @ 1505547550809
> 2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 93 (254374636 bytes in 62573 ms).
> 2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 94 @ 1505547650809
> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 94 (256386533 bytes in 111898 ms).
> 2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 95 @ 1505547762850
> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 95 (258441766 bytes in 203268 ms).
> 2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 96 @ 1505547966241
> 2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map
> (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:970)
>     at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for
> operator KeyedCEPPatternOperator -> Map (1/4).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:897)
>     ... 5 more
>
>
> So, it looks like the Job Manager ran out of memory, thanks to the
> "Progressively Getting Worse" checkpoints. Any ideas on how to make sure
> the checkpoints faster?
>
>
>
>
>
>
> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Sridhar,
>>
>> Sorry that this didn't get a response earlier.
>>
>> According to the trace, it seems like the job failed during the process,
>> and
>> when trying to automatically restore from a checkpoint, deserialization
>> of a
>> CEP `IterativeCondition` object failed. As far as I can tell, CEP
>> operators
>> are just using Java serialization on CEP `IterativeCondition` objects, so
>> should not be related to the protobuf serializer that you are using.
>>
>> Is this still constantly happening for you?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Re: StreamCorruptedException

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
I talked a bit with Kostas on what may be happening here.

It could be that your patterns are not closing, which depends on the pattern construction of your CEP job.
Could you perhaps provide an overview / code snippet of what your CEP job is doing?

Looping Kostas (in CC) also to this thread as he may have a better idea what is happening here.

Cheers,
Gordon
On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (flinkenthu@gmail.com) wrote:

Thanks for the reply. Well, tracing back to the root cause, I see the following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the checkpoints faster?

                                                         




On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamCorruptedException

Posted by Sridhar Chellappa <fl...@gmail.com>.
Thanks for the reply. Well, tracing back to the root cause, I see the
following:

1. At the Job manager, the Checkpoint times are getting worse :

Jobmanager :

Checkpoint times are getting worse progressively.

2017-09-16 05:05:50,813 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 1 @ 1505538350809
2017-09-16 05:05:51,396 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 1 (11101233 bytes in 586 ms).
2017-09-16 05:07:30,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 2 @ 1505538450809
2017-09-16 05:07:31,657 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 2 (18070955 bytes in 583 ms).

                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
                                                          .
2017-09-16 07:32:58,117 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 89 (246125113 bytes in 27194 ms).
2017-09-16 07:34:10,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 90 @ 1505547250809
2017-09-16 07:34:44,932 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 90 (248272325 bytes in 34012 ms).
2017-09-16 07:35:50,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 91 @ 1505547350809
2017-09-16 07:36:37,058 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 91 (250348812 bytes in 46136 ms).
2017-09-16 07:37:30,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 92 @ 1505547450809
2017-09-16 07:38:18,076 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 92 (252399724 bytes in 47152 ms).
2017-09-16 07:39:10,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 93 @ 1505547550809
2017-09-16 07:40:13,494 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 93 (254374636 bytes in 62573 ms).
2017-09-16 07:40:50,809 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 94 @ 1505547650809
2017-09-16 07:42:42,850 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 94 (256386533 bytes in 111898 ms).
2017-09-16 07:42:42,850 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 95 @ 1505547762850
2017-09-16 07:46:06,241 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 95 (258441766 bytes in 203268 ms).
2017-09-16 07:46:06,241 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 96 @ 1505547966241
2017-09-16 07:48:42,069 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d)
switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
96 for operator KeyedCEPPatternOperator -> Map (1/4).}
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 96 for
operator KeyedCEPPatternOperator -> Map (1/4).
    ... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
    ... 5 more


So, it looks like the Job Manager ran out of memory, thanks to the
"Progressively Getting Worse" checkpoints. Any ideas on how to make sure
the checkpoints faster?






On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Sridhar,
>
> Sorry that this didn't get a response earlier.
>
> According to the trace, it seems like the job failed during the process,
> and
> when trying to automatically restore from a checkpoint, deserialization of
> a
> CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
> are just using Java serialization on CEP `IterativeCondition` objects, so
> should not be related to the protobuf serializer that you are using.
>
> Is this still constantly happening for you?
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: StreamCorruptedException

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Sridhar,

Sorry that this didn't get a response earlier.

According to the trace, it seems like the job failed during the process, and
when trying to automatically restore from a checkpoint, deserialization of a
CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
are just using Java serialization on CEP `IterativeCondition` objects, so
should not be related to the protobuf serializer that you are using.

Is this still constantly happening for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/