You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timo Walther <tw...@apache.org> on 2021/02/01 14:17:40 UTC

Re: Problem restirng state

Hi Shridhar,

the exception indicates that something is wrong with the object 
serialization. Kryo is unable to serialize the given object.

It might help to

1) register a custom Kryo serializer in the ExecutionConfig or

2 ) pass dedicated type information using the types from 
org.apache.flink.api.common.typeinfo.Types such that Kryo as the 
"generic" serializer is not necessary anymore.

I hope this helps.

Regards,
Timo

On 28.01.21 12:41, Shridhar Kulkarni wrote:
> All,
> 
> We are getting the exception, copied at the end of this post. The 
> exception is thrown when a new flink job is submitted; when Flink tries 
> to restore the previous state.
> 
> Environment:
>      Flink version: 1.10.1
>      State persistence: Hadoop 3.3
>      Zookeeper 3.5.8
>      Parallelism: 4
> 
> The code implements DataStream Transformation functions: ProcessFunction 
> -> KeySelector -> ProcessFunction
> Inbound messages are partitioned by key "sourceId" which is a part of 
> the exception stack trace. SourceId is String type and is unique.
> -------
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> Serialization trace:
> sourceId (com.contineo.ext.flink.core.ThingState)
> -------
> 
> We have overridden 
> "org.apache.flink.streaming.api.functions.ProcessFunction.open()" method
> Any help is appreciated
> 
> 
> Exception stack trace:
> 
> 2021-01-19 19:59:56,934 INFO 
>   org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - 
> Checkpoint triggering task Source: Custom Source -> Process -> Process 
> (3/4) of job c957f40043721b5cab3161991999a7ed is not in state RUNNING 
> but DEPLOYING instead. Aborting checkpoint.
> 2021-01-19 19:59:57,358 INFO 
>   org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> Process -> Sink: Unnamed (4/4) (b2605627c2fffc83dd412b3e7565244d) 
> switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> LegacyKeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(4/4) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: 
> Failed when trying to restore heap backend
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
> at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> Serialization trace:
> sourceId (com.contineo.ext.flink.core.ThingState)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:154)
> at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> ... 15 more
> Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 10
> at java.util.ArrayList.rangeCheck(ArrayList.java:659)
> at java.util.ArrayList.get(ArrayList.java:435)
> at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
> at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
> ... 24 more
> 
> Your help is highly appreciated,
> 
> Thanks,
> Shridhar
>