You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Clemens Valiente <cl...@grab.com> on 2023/02/03 08:52:49 UTC

EOFException when deserializing from RocksDB

Hi, I have been struggling with this particular Exception for days and
thought I'd ask for help here.

I am using a KeyedProcessFunction with a

  private lazy val state: ValueState[Feature] = {
    val stateDescriptor = new
ValueStateDescriptor[Feature]("CollectFeatureProcessState",
createTypeInformation[Feature])
    getRuntimeContext.getState(stateDescriptor)
  }


which is used in my process function as follows

  override def processElement(value: Feature, ctx:
KeyedProcessFunction[String, Feature, Feature]#Context, out:
Collector[Feature]): Unit = {
    val current: Feature = state.value match {
      case null => value
      case exists => combine(value, exists)
    }
    if (checkForCompleteness(current)) {
      out.collect(current)
      state.clear()
    } else {
      state.update(current)
    }
  }


Feature is a protobuf class that I registered with kryo as follows (using
chill-protobuf)

env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
classOf[ProtobufSerializer])

But I also got Exceptions with normal scala case classes wrapping this
Feature class, and without the ProtobufSerializer using the standard slow
Java Serializer.
The exception occurs within the first minutes/seconds of starting the app
and looks as follows:

2023-02-03 08:41:07,577 WARN  org.apache.flink.runtime.taskmanager.Task
               [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map ->
Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
(1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
at com.grab.app.functions.stream.CollectFeatureProcessFunction
$.processElement(CollectFeatureProcessFunction.scala:17)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
at org.apache.flink.types.StringValue.readString(StringValue.java:786)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
... 16 more

The exception is thrown at
com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
which is this line:

val current: AcornHydraeventFeature = state.value match {


Did someone run into this before and/or can point me at the right direction
for further investigation?

Thanks a lot
Clemens

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  <https://grab.com/privacy/>


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law


Re: EOFException when deserializing from RocksDB

Posted by Clemens Valiente <cl...@grab.com>.
If I store the Java protobuf objects in the rocksdb instead of the scala
objects, I get this stacktrace:

2023-02-07 09:17:04,246 WARN  org.apache.flink.runtime.taskmanager.Task
               [] - KeyedProcess -> (Map -> Sink: signalSink, Map -> Flat
Map -> Sink: FeatureSink, Sink: logsink) (2/2)#0
(fa4aae8fb7d2a7a94eafb36fe5470851_6760a9723a5626620871f040128bad1b_1_0)
switched from RUNNING to FAILED with failure cause:
org.apache.flink.util.FlinkRuntimeException: Error while adding data to
RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:109)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:69)
at
com.grab.grabdefence.acorn.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectProcessFunction.scala:18)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains
data from a previous serialize call. It has to be flushed or cleared at the
end of the serialize call.
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:358)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:180)
at
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:168)
at
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:107)
... 16 more

I do not touch the Kryo Serializer apart from the one
registerTypeWithKryoSerializer
call, and I only call the state.value() and update() once each in the
processElement()
method.

I thought these Stores are supposedly abstracted away safely enough that as
a user I wouldn't have to worry about the exact
Flush/Serialization/Deserialization logic but it seems that this
application breaks even though I am only using what I think is quite
innocent code?

On Fri, Feb 3, 2023 at 4:52 PM Clemens Valiente <cl...@grab.com>
wrote:

>
> Hi, I have been struggling with this particular Exception for days and
> thought I'd ask for help here.
>
> I am using a KeyedProcessFunction with a
>
>   private lazy val state: ValueState[Feature] = {
>     val stateDescriptor = new
> ValueStateDescriptor[Feature]("CollectFeatureProcessState",
> createTypeInformation[Feature])
>     getRuntimeContext.getState(stateDescriptor)
>   }
>
>
> which is used in my process function as follows
>
>   override def processElement(value: Feature, ctx:
> KeyedProcessFunction[String, Feature, Feature]#Context, out:
> Collector[Feature]): Unit = {
>     val current: Feature = state.value match {
>       case null => value
>       case exists => combine(value, exists)
>     }
>     if (checkForCompleteness(current)) {
>       out.collect(current)
>       state.clear()
>     } else {
>       state.update(current)
>     }
>   }
>
>
> Feature is a protobuf class that I registered with kryo as follows (using
> chill-protobuf)
>
> env.getConfig.registerTypeWithKryoSerializer(classOf[Feature],
> classOf[ProtobufSerializer])
>
> But I also got Exceptions with normal scala case classes wrapping this
> Feature class, and without the ProtobufSerializer using the standard slow
> Java Serializer.
> The exception occurs within the first minutes/seconds of starting the app
> and looks as follows:
>
> 2023-02-03 08:41:07,577 WARN  org.apache.flink.runtime.taskmanager.Task
>                  [] - KeyedProcess -> (Map -> Sink: FeatureSignalSink, Map
> -> Flat Map -> Sink: FeatureStore, Sink: logsink) (2/2)#0
> (1befbd4d8975833fc973fc080ea866e4) switched from RUNNING to FAILED with
> failure cause: org.apache.flink.util.FlinkRuntimeException: Error while
> retrieving data from RocksDB.
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
> at
> com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
> at com.grab.app.functions.stream.CollectFeatureProcessFunction
> $.processElement(CollectFeatureProcessFunction.scala:17)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.EOFException
> at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)
> at org.apache.flink.types.StringValue.readString(StringValue.java:786)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:128)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.deserialize(TraversableSerializer.scala:34)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:123)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:89)
> ... 16 more
>
> The exception is thrown at com.grab.app.functions.stream.CollectFeatureProcessFunction$.processElement(CollectFeatureProcessFunction.scala:55)
> which is this line:
>
> val current: AcornHydraeventFeature = state.value match {
>
>
> Did someone run into this before and/or can point me at the right
> direction for further investigation?
>
> Thanks a lot
> Clemens
>


-- 
[image: Grab Singapore] <https://htmlsig.com/t/000001BKA99J>

[image: Twitter] <https://twitter.com/grabth?lang=en>  [image: Facebook]
<https://www.facebook.com/GrabTH/>  [image: LinkedIn]
<https://www.linkedin.com/company/grabapp>  [image: Instagram]
<https://www.instagram.com/grabth/>  [image: Youtube]
<https://www.youtube.com/channel/UCrK1UNPks-lRzKwJ0kEWoJg>

Clemens Valienteclemens.valiente@grab.com

Grab Singapore9 Straits View, Marina One West Tower, #23-07/12Singapore
018937www.grab.com

-- 


By communicating with Grab Holdings Limited and/or its subsidiaries, 
associate companies and jointly controlled entities (collectively, “Grab”), 
you are deemed to have consented to the processing of your personal data as 
set out in the Privacy Notice which can be viewed at 
https://grab.com/privacy/  <https://grab.com/privacy/>


 This email 
contains confidential information that may be privileged and is only for 
the intended recipient(s). If you are not the intended recipient(s), please 
do not disseminate, distribute or copy this email. Please notify Grab 
immediately if you have received this by mistake and delete this email from 
your system. Email transmission may not be secure or error-free as any 
information could be intercepted, corrupted, lost, destroyed, delayed or 
incomplete, or contain viruses. Grab does not accept liability for any 
errors or omissions in this email that arise as a result of email 
transmission. All intellectual property rights in this email and any 
attachments shall remain vested in Grab, unless otherwise provided by law