You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vijayakumar palaniappan <vi...@gmail.com> on 2021/08/19 00:01:44 UTC

Error while deserializing the element

Setup Specifics:
Version: 1.6.2
RocksDB Map State
Timers stored in rocksdb

When we have this job running for long periods of time like > 30 days, if
for some reason the job restarts, we encounter "Error while deserializing
the element". Is this a known issue fixed in later versions? I see some
changes to code for FLINK-10175, but we don't use any queryable state

Below is the stack trace

org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
element.

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)

at
org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)

at
org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)

at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)

at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)

at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.EOFException

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)

at org.apache.flink.types.StringValue.readString(StringValue.java:769)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)

at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)

at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)

at
org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)

at
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)

... 20 more

-- 
Thanks,
-Vijay

Re: Error while deserializing the element

Posted by JING ZHANG <be...@gmail.com>.
My previous response in mail list was too brief, sorry about that. For the
use of config 'state.backend.rocksdb.timer-service.factory: HEAP', I would
like to add some information. Please correct me if anything is wrong.

   1. The configure could help to avoid the EOF exception when recover
   rocksdb queue set from savepoint/checkpoint file. As you could see, if
   we use HEAP for timers, the timer state and other operator state would not
   mixed up in one rocksdb instance. So we could avoid the exception. The jobs
   which adds the configure don't encounter such problem again too.
   2. If update the timer store in Rocksdb to store in HEAP, we could not
   restore the job from savepoint/checkpoint indeed. If the user makes a
   mistake by recovering job from savepoint/checkpoint file, the job will not
   throw an exception, but the timer state will be lost. This is unreasonable
   which we could add a Jira to fix the issue. We suggest our users to use
   backfill mode to restart the job instead of restart from cp/sp files.
   3. When we use 'state.backend.rocksdb.timer-service.factory: HEAP', the
   job may consume more memory. Therefore, we need to pay close attention to
   memory usage and increase memory resources if necessary.


JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:27写道:

> Hi Vijay, Yun,
> I've created a JIRA https://issues.apache.org/jira/browse/FLINK-23886 to
> track this.
>
> Best,
> JING ZHANG
>
> JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:19写道:
>
>> Hi Vijay,
>> I have encountered the same problem several times in online production
>> Flink jobs, but I have not found the root cause of the exception yet.
>> We have walk around the exception by adding the following parameter, hope
>> it could help you.
>> state.backend.rocksdb.timer-service.factory: HEAP
>>
>> I would invite Yun Tang who is an expert on the topic to look into the
>> problem, we could also create a JIRA to track the issue.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb
>>
>> Best,
>> JING ZHANG
>>
>> vijayakumar palaniappan <vi...@gmail.com> 于2021年8月19日周四 上午8:02写道:
>>
>>> Setup Specifics:
>>> Version: 1.6.2
>>> RocksDB Map State
>>> Timers stored in rocksdb
>>>
>>> When we have this job running for long periods of time like > 30 days,
>>> if for some reason the job restarts, we encounter "Error while
>>> deserializing the element". Is this a known issue fixed in later
>>> versions? I see some changes to code for FLINK-10175, but we don't use
>>> any queryable state
>>>
>>> Below is the stack trace
>>>
>>> org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>>> the element.
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>>
>>> at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> Caused by: java.io.EOFException
>>>
>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>>
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>>>
>>> at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>
>>> at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>>>
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>>>
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>>>
>>> ... 20 more
>>>
>>> --
>>> Thanks,
>>> -Vijay
>>>
>>

Re: Error while deserializing the element

Posted by JING ZHANG <be...@gmail.com>.
Hi Vijay, Yun,
I've created a JIRA https://issues.apache.org/jira/browse/FLINK-23886 to
track this.

Best,
JING ZHANG

JING ZHANG <be...@gmail.com> 于2021年8月20日周五 下午1:19写道:

> Hi Vijay,
> I have encountered the same problem several times in online production
> Flink jobs, but I have not found the root cause of the exception yet.
> We have walk around the exception by adding the following parameter, hope
> it could help you.
> state.backend.rocksdb.timer-service.factory: HEAP
>
> I would invite Yun Tang who is an expert on the topic to look into the
> problem, we could also create a JIRA to track the issue.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb
>
> Best,
> JING ZHANG
>
> vijayakumar palaniappan <vi...@gmail.com> 于2021年8月19日周四 上午8:02写道:
>
>> Setup Specifics:
>> Version: 1.6.2
>> RocksDB Map State
>> Timers stored in rocksdb
>>
>> When we have this job running for long periods of time like > 30 days, if
>> for some reason the job restarts, we encounter "Error while
>> deserializing the element". Is this a known issue fixed in later
>> versions? I see some changes to code for FLINK-10175, but we don't use
>> any queryable state
>>
>> Below is the stack trace
>>
>> org.apache.flink.util.FlinkRuntimeException: Error while deserializing
>> the element.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>>
>> at
>> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>>
>> at
>> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>>
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>>
>> at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>>
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.io.EOFException
>>
>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>
>> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>
>> at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>>
>> ... 20 more
>>
>> --
>> Thanks,
>> -Vijay
>>
>

Re: Error while deserializing the element

Posted by JING ZHANG <be...@gmail.com>.
Hi Vijay,
I have encountered the same problem several times in online production
Flink jobs, but I have not found the root cause of the exception yet.
We have walk around the exception by adding the following parameter, hope
it could help you.
state.backend.rocksdb.timer-service.factory: HEAP

I would invite Yun Tang who is an expert on the topic to look into the
problem, we could also create a JIRA to track the issue.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#timers-heap-vs-rocksdb

Best,
JING ZHANG

vijayakumar palaniappan <vi...@gmail.com> 于2021年8月19日周四 上午8:02写道:

> Setup Specifics:
> Version: 1.6.2
> RocksDB Map State
> Timers stored in rocksdb
>
> When we have this job running for long periods of time like > 30 days, if
> for some reason the job restarts, we encounter "Error while deserializing
> the element". Is this a known issue fixed in later versions? I see some
> changes to code for FLINK-10175, but we don't use any queryable state
>
> Below is the stack trace
>
> org.apache.flink.util.FlinkRuntimeException: Error while deserializing the
> element.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121)
>
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85)
>
> at
> org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73)
>
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.<init>(KeyGroupPartitionedPriorityQueue.java:89)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106)
>
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764)
>
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.EOFException
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>
> at org.apache.flink.types.StringValue.readString(StringValue.java:769)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179)
>
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168)
>
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:387)
>
> ... 20 more
>
> --
> Thanks,
> -Vijay
>