You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Clemens Valiente <cl...@grab.com> on 2023/02/03 08:52:49 UTC
EOFException when deserializing from RocksDB
Hi, I have been struggling with this particular Exception for days and
thought I'd ask for help here.
I am using a KeyedProcessFunction with a
private lazy val state: ValueState[Feature] = {
val stateDescriptor = new
ValueStateDescriptor[Feature]("CollectFeatureProcessState",
createTypeInformation[Feature])
getRuntimeContext.getState(stateDescriptor)
}
which is used in my process function as follows
override def processElement(value: Feature, ctx:
KeyedProcessFunction[String, Feature, Feature]#Context, out:
Collector[Feature]): Unit = {
val current: Feature = state.value match {
case null => value
case exists => combine(value, exists)
}
if (checkForCompleteness(current)) {
out.collect(current)
state.clear()
} else {
state.update(current)
}
}
Feature is a protobuf class that I registered with kryo as follows (using
chill-protobuf)
env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
classOf[ProtobufSerializer])
But I also got Exceptions with normal scala case classes wrapping this
Feature class, and without the ProtobufSerializer using the standard slow
Java Serializer.
The exception occurs within the first minutes/seconds of starting the app
and looks as follows:
2023-02-03 08:41:07,577 WARN org.apache.flink.runtime.taskmanager.Task
[] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map ->
Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
(1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
at com.grab.app.functions.stream.CollectFeatureProcessFunction
$.processElement(CollectFeatureProcessFunction.scala:17)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
... 16 more
The exception is thrown at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
which is this line:
val current: AcornHydraeventFeature = state.value match {
Did someone run into this before and/or can point me at the right direction
for further investigation?
Thanks a lot
Clemens
--
By communicating with Grab Holdings Limited and/or its subsidiaries,
associate companies and jointly controlled entities (collectively, “Grab”),
you are deemed to have consented to the processing of your personal data as
set out in the Privacy Notice which can be viewed at
https://grab.com/privacy/ <https://grab.com/privacy/>
This email
contains confidential information that may be privileged and is only for
the intended recipient(s). If you are not the intended recipient(s), please
do not disseminate, distribute or copy this email. Please notify Grab
immediately if you have received this by mistake and delete this email from
your system. Email transmission may not be secure or error-free as any
information could be intercepted, corrupted, lost, destroyed, delayed or
incomplete, or contain viruses. Grab does not accept liability for any
errors or omissions in this email that arise as a result of email
transmission. All intellectual property rights in this email and any
attachments shall remain vested in Grab, unless otherwise provided by law
Re: EOFException when deserializing from RocksDB
Posted by Clemens Valiente <cl...@grab.com>.
If I store the Java protobuf objects in the rocksdb instead of the scala
objects, I get this stacktrace:
2023-02-07 09:17:04,246 WARN org.apache.flink.runtime.taskmanager.Task
[] - KeyedProcess -> (Map -> Sink: signalSink, Map -> Flat
Map -> Sink: FeatureSink, Sink: logsink) (2/2)#0
(fa4aae8fb7d2a7a94eafb36fe5470851_6760a9723a5626620871f040128bad1b_1_0)
switched from RUNNING to FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: Error while adding data to
RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:109)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:69)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectProcessFunction.scala:18)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains
data from a previous serialize call. It has to be flushed or cleared at the
end of the serialize call.
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:358)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:180)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:168)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:107)
... 16 more
I do not touch the Kryo Serializer apart from the one
registerTypeWithKryoSerializer
call, and I only call the state.value() and update() once each in the
processElement()
method.
I thought these Stores are supposedly abstracted away safely enough that as
a user I wouldn't have to worry about the exact
Flush/Serialization/Deserialization logic but it seems that this
application breaks even though I am only using what I think is quite
innocent code?
On Fri, Feb 3, 2023 at 4:52 PM Clemens Valiente <cl...@grab.com>
wrote:
>
> Hi, I have been struggling with this particular Exception for days and
> thought I'd ask for help here.
>
> I am using a KeyedProcessFunction with a
>
> private lazy val state: ValueState[Feature] = {
> val stateDescriptor = new
> ValueStateDescriptor[Feature]("CollectFeatureProcessState",
> createTypeInformation[Feature])
> getRuntimeContext.getState(stateDescriptor)
> }
>
>
> which is used in my process function as follows
>
> override def processElement(value: Feature, ctx:
> KeyedProcessFunction[String, Feature, Feature]#Context, out:
> Collector[Feature]): Unit = {
> val current: Feature = state.value match {
> case null => value
> case exists => combine(value, exists)
> }
> if (checkForCompleteness(current)) {
> out.collect(current)
> state.clear()
> } else {
> state.update(current)
> }
> }
>
>
> Feature is a protobuf class that I registered with kryo as follows (using
> chill-protobuf)
>
> env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
> classOf[ProtobufSerializer])
>
> But I also got Exceptions with normal scala case classes wrapping this
> Feature class, and without the ProtobufSerializer using the standard slow
> Java Serializer.
> The exception occurs within the first minutes/seconds of starting the app
> and looks as follows:
>
> 2023-02-03 08:41:07,577 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map
> -> Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
> (1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
> failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
> retrieving data from RocksDB.
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
> at
> com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
> at com.grab.app.functions.stream.CollectFeatureProcessFunction
> $.processElement(CollectFeatureProcessFunction.scala:17)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.EOFException
> at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
> at org.apache.flink.types.StringValue.readString(StringValue.java:786)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> ... 16 more
>
> The exception is thrown at com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
> which is this line:
>
> val current: AcornHydraeventFeature = state.value match {
>
>
> Did someone run into this before and/or can point me at the right
> direction for further investigation?
>
> Thanks a lot
> Clemens
>
--
[image: Grab Singapore] <https://htmlsig.com/t/000001BKA99J>
[image: Twitter] <https://twitter.com/grabth?lang=en> [image: Facebook]
<https://www.facebook.com/GrabTH/> [image: LinkedIn]
<https://www.linkedin.com/company/grabapp> [image: Instagram]
<https://www.instagram.com/grabth/> [image: Youtube]
<https://www.youtube.com/channel/UCrK1UNPks-lRzKwJ0kEWoJg>
Clemens Valienteclemens.valiente@grab.com
Grab Singapore9 Straits View, Marina One West Tower, #23-07/12Singapore
018937www.grab.com
--
By communicating with Grab Holdings Limited and/or its subsidiaries,
associate companies and jointly controlled entities (collectively, “Grab”),
you are deemed to have consented to the processing of your personal data as
set out in the Privacy Notice which can be viewed at
https://grab.com/privacy/ <https://grab.com/privacy/>
This email
contains confidential information that may be privileged and is only for
the intended recipient(s). If you are not the intended recipient(s), please
do not disseminate, distribute or copy this email. Please notify Grab
immediately if you have received this by mistake and delete this email from
your system. Email transmission may not be secure or error-free as any
information could be intercepted, corrupted, lost, destroyed, delayed or
incomplete, or contain viruses. Grab does not accept liability for any
errors or omissions in this email that arise as a result of email
transmission. All intellectual property rights in this email and any
attachments shall remain vested in Grab, unless otherwise provided by law