You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shannon Carey <sc...@expedia.com> on 2016/09/28 17:28:09 UTC

Error while adding data to RocksDB: No more bytes left

It appears that when one of my jobs tries to checkpoint, the following exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
	at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
	at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
	at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
	at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
	at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
	at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
	at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
	... 6 more

Thanks for any help!

Shannon

Re: Error while adding data to RocksDB: No more bytes left

Posted by Stephan Ewen <se...@apache.org>.
@Shannon Concerning the issue with long checkpoints even though the
snapshot is very short:

I found a critical issue with the Flink Kafka 0.9 Consumer - on
low-throughput topics/partitions, it can lock up for a while, preventing
checkpoints to be triggered (barriers injected).

There is a fix going in now, probably a 1.1.3 release with that fix soon.

On Thu, Sep 29, 2016 at 9:12 PM, Shannon Carey <sc...@expedia.com> wrote:

> Hi Stephan!
>
> The failure appeared to occur every 10 minutes, which is also the interval
> for checkpointing. However, I agree with you that the stack trace appears
> to be independent. Could this perhaps be an issue with multithreading,
> where the checkpoint mechanism is somehow interfering with ongoing
> operation of the state backend? I've never seen this problem until now, so
> I am a little suspicious that it might be due to something in my code, but
> so far it's been difficult to figure out what that might be.
>
> I am using the default, SemiAsync snapshot mode.
>
> The classes of the data flow are a bit too large to put here in their
> entirety. We are using Scala case classes, Java classes generated by Avro,
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of
> these classes have been operational in other jobs before. I added a unit
> test for the class which contains a mutable.Map to see whether that was
> causing a problem. Does this look like a reasonable unit test to verify
> Flink serializability to you?
>
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
>
>   val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100)
>
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
>
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig)
>
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
>
>   out.size() should be <= expectedMaxBytes
>
>   val inputView: DataInputViewStreamWrapper =
>     new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
>
>   deserialized
> }
>
> I tried running my job in a local one-slot cluster with RocksDB enabled
> but checkpointing to local filesystem. Similar errors occur, but are more
> sporadic. I have not yet been able to capture the error while debugging,
> but if I do I will provide additional information.
>
> I noticed that locally, execution only reaches
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint
> completes. Also, the timing of checkpointing a bit odd: in the example
> below the checkpoint takes 200s to complete after being triggered even
> though RocksDB reports that it only took ~100ms.
>
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
> backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB
> materialization from /var/folders/…/WindowOperator_
> 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to
> file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed
> checkpoint 2 (in 200621 ms)
>
> Do you have any other advice?
>
> Exceptions from local execution:
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> 'CLE
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:118)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: 'CLE
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 16 more
>
> After that one happened, this one happened many times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: java.lang.StringIndexOutOfBoundsException: String index out of
> range: -2
> at java.lang.String.<init>(String.java:196)
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:132)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 44 more
>
> During another execution, this one occurred several times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> #
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 45 more
> Caused by: java.lang.ClassNotFoundException: #
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 52 more
>
>
>
> From: Stephan Ewen <se...@apache.org>
> Date: Wednesday, September 28, 2016 at 1:18 PM
> To: <us...@flink.apache.org>
> Subject: Re: Error while adding data to RocksDB: No more bytes left
>
> Hi Shannon!
>
> The stack trace you pasted is independent of checkpointing - it seems to
> be from the regular processing. Does this only happen when checkpoints are
> activated?
>
> Can you also share which checkpoint method you use?
>   - FullyAsynchronous
>   - SemiAsynchronous
>
> I think there are two possibilities for what can happen
>   - There is a serialization inconsistency in the Serializers. If that is
> the case, this error should occur almost in a deterministic fashion. To
> debug that, would be good to know which data types you are using.
>   - There is a bug in RocksDB (or Flink's wrapping of it) where data gets
> corrupted when using the snapshot feature. That would explain why this only
> occurs when checkpoints are happening.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sc...@expedia.com> wrote:
>
>> It appears that when one of my jobs tries to checkpoint, the following
>> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB
>> checkpoints are being saved to S3.
>>
>> java.lang.RuntimeException: Error while adding data to RocksDB
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:125)
>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:382)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:176)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:66)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:266)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.EOFException: No more bytes left.
>>         at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.
>> require(NoFetchingInput.java:77)
>>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>         at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$Unsa
>> feLongField.read(UnsafeCacheFields.java:160)
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>> 761)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:232)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:113)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:30)
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:118)
>>         ... 6 more
>>
>> Thanks for any help!
>>
>> Shannon
>>
>
>

Re: Error while adding data to RocksDB: No more bytes left

Posted by Shannon Carey <sc...@expedia.com>.
Implementing a custom serialization approach with Flink's CopyableValue (instead of relying on Flink to automatically use Kryo) solved the issue. As a side benefit, this also reduced the serialized size of my object by about half.


From: Stephan Ewen <se...@apache.org>>
Date: Friday, September 30, 2016 at 3:58 AM
To: <us...@flink.apache.org>>
Cc: Stephan Ewen <se...@apache.org>>
Subject: Re: Error while adding data to RocksDB: No more bytes left

Agree with Stefan, let's see if the fully async snapshot mode helps. It looks suspiciously RocksDB related...

On Fri, Sep 30, 2016 at 10:30 AM, Stefan Richter <s....@data-artisans.com>> wrote:
Hi Shannon,

from your new stack trace and the bogus class names, I agree with Stephan that either serialization or the database itself is corrupted in some way. Could you please check if this problem only happens if checkpointing is enabled? If yes, does switching to fully async snapshots change the behavior?

Best,
Stefan

Am 29.09.2016 um 21:12 schrieb Shannon Carey <sc...@expedia.com>>:

Hi Stephan!

The failure appeared to occur every 10 minutes, which is also the interval for checkpointing. However, I agree with you that the stack trace appears to be independent. Could this perhaps be an issue with multithreading, where the checkpoint mechanism is somehow interfering with ongoing operation of the state backend? I've never seen this problem until now, so I am a little suspicious that it might be due to something in my code, but so far it's been difficult to figure out what that might be.

I am using the default, SemiAsync snapshot mode.

The classes of the data flow are a bit too large to put here in their entirety. We are using Scala case classes, Java classes generated by Avro, Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of these classes have been operational in other jobs before. I added a unit test for the class which contains a mutable.Map to see whether that was causing a problem. Does this look like a reasonable unit test to verify Flink serializability to you?

it("roundtrip serializes in Flink") {
  val millis: Long = TimeUnit.DAYS.toMillis(2)
  val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
  original.add("a", TimestampedAirportCount(4, 6))
  original.add("b", TimestampedAirportCount(7, 8))

  val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100)

  deserialized.timestamp shouldBe millis
  deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
  deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
}

def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = {
  val typeInfo = implicitly[TypeInformation[T]]
  val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig)

  val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
  val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
  serializer.serialize(original, outputView)

  out.size() should be <= expectedMaxBytes

  val inputView: DataInputViewStreamWrapper =
    new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
  val deserialized: T = serializer.deserialize(inputView)

  deserialized
}

I tried running my job in a local one-slot cluster with RocksDB enabled but checkpointing to local filesystem. Similar errors occur, but are more sporadic. I have not yet been able to capture the error while debugging, but if I do I will provide additional information.

I noticed that locally, execution only reaches DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint completes. Also, the timing of checkpointing a bit odd: in the example below the checkpoint takes 200s to complete after being triggered even though RocksDB reports that it only took ~100ms.

2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering checkpoint 2 @ 1475171777619
2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db) backup (synchronous part) took 7 ms.
2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization from /var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous part) took 96 ms.
2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed checkpoint 2 (in 200621 ms)

Do you have any other advice?

Exceptions from local execution:

java.lang.RuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at org.apache.flink.streaming.runtime.io<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 'CLE
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
... 6 more
Caused by: java.lang.ClassNotFoundException: 'CLE
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 16 more

After that one happened, this one happened many times:

java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to deserialize default value.
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
... 1 more
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -2
at java.lang.String.<init>(String.java:196)
at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readString(Input.java:466)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:132)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
... 44 more

During another execution, this one occurred several times:

java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to deserialize default value.
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: #
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
... 45 more
Caused by: java.lang.ClassNotFoundException: #
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 52 more



From: Stephan Ewen <se...@apache.org>>
Date: Wednesday, September 28, 2016 at 1:18 PM
To: <us...@flink.apache.org>>
Subject: Re: Error while adding data to RocksDB: No more bytes left

Hi Shannon!

The stack trace you pasted is independent of checkpointing - it seems to be from the regular processing. Does this only happen when checkpoints are activated?

Can you also share which checkpoint method you use?
  - FullyAsynchronous
  - SemiAsynchronous

I think there are two possibilities for what can happen
  - There is a serialization inconsistency in the Serializers. If that is the case, this error should occur almost in a deterministic fashion. To debug that, would be good to know which data types you are using.
  - There is a bug in RocksDB (or Flink's wrapping of it) where data gets corrupted when using the snapshot feature. That would explain why this only occurs when checkpoints are happening.

Greetings,
Stephan


On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sc...@expedia.com>> wrote:
It appears that when one of my jobs tries to checkpoint, the following exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
        at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
        at org.apache.flink.streaming.runtime.io<http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
        at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io/>.Input.readVarLong(Input.java:690)
        at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io/>.Input.readLong(Input.java:685)
        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
        at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
        ... 6 more

Thanks for any help!

Shannon




Re: Error while adding data to RocksDB: No more bytes left

Posted by Stephan Ewen <se...@apache.org>.
Agree with Stefan, let's see if the fully async snapshot mode helps. It
looks suspiciously RocksDB related...

On Fri, Sep 30, 2016 at 10:30 AM, Stefan Richter <
s.richter@data-artisans.com> wrote:

> Hi Shannon,
>
> from your new stack trace and the bogus class names, I agree with Stephan
> that either serialization or the database itself is corrupted in some way.
> Could you please check if this problem only happens if checkpointing is
> enabled? If yes, does switching to fully async snapshots change the
> behavior?
>
> Best,
> Stefan
>
> Am 29.09.2016 um 21:12 schrieb Shannon Carey <sc...@expedia.com>:
>
> Hi Stephan!
>
> The failure appeared to occur every 10 minutes, which is also the interval
> for checkpointing. However, I agree with you that the stack trace appears
> to be independent. Could this perhaps be an issue with multithreading,
> where the checkpoint mechanism is somehow interfering with ongoing
> operation of the state backend? I've never seen this problem until now, so
> I am a little suspicious that it might be due to something in my code, but
> so far it's been difficult to figure out what that might be.
>
> I am using the default, SemiAsync snapshot mode.
>
> The classes of the data flow are a bit too large to put here in their
> entirety. We are using Scala case classes, Java classes generated by Avro,
> Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of
> these classes have been operational in other jobs before. I added a unit
> test for the class which contains a mutable.Map to see whether that was
> causing a problem. Does this look like a reasonable unit test to verify
> Flink serializability to you?
>
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
>
>   val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100)
>
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
>
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig)
>
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
>
>   out.size() should be <= expectedMaxBytes
>
>   val inputView: DataInputViewStreamWrapper =
>     new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
>
>   deserialized
> }
>
> I tried running my job in a local one-slot cluster with RocksDB enabled
> but checkpointing to local filesystem. Similar errors occur, but are more
> sporadic. I have not yet been able to capture the error while debugging,
> but if I do I will provide additional information.
>
> I noticed that locally, execution only reaches
> DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint
> completes. Also, the timing of checkpointing a bit odd: in the example
> below the checkpoint takes 200s to complete after being triggered even
> though RocksDB reports that it only took ~100ms.
>
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering
> checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB
> (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db)
> backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB
> materialization from /var/folders/…/WindowOperator_
> 38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to
> file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2
> (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed
> checkpoint 2 (in 200621 ms)
>
> Do you have any other advice?
>
> Exceptions from local execution:
>
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> 'CLE
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(
> RocksDBFoldingState.java:118)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: 'CLE
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 16 more
>
> After that one happened, this one happened many times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: java.lang.StringIndexOutOfBoundsException: String index out of
> range: -2
> at java.lang.String.<init>(String.java:196)
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:132)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 44 more
>
> During another execution, this one occurred several times:
>
> java.lang.RuntimeException: Failed to deserialize state handle and setup
> initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:285)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(
> SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> #
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(
> CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.
> readObject(StateDescriptor.java:282)
> ... 45 more
> Caused by: java.lang.ClassNotFoundException: #
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 52 more
>
>
>
> From: Stephan Ewen <se...@apache.org>
> Date: Wednesday, September 28, 2016 at 1:18 PM
> To: <us...@flink.apache.org>
> Subject: Re: Error while adding data to RocksDB: No more bytes left
>
> Hi Shannon!
>
> The stack trace you pasted is independent of checkpointing - it seems to
> be from the regular processing. Does this only happen when checkpoints are
> activated?
>
> Can you also share which checkpoint method you use?
>   - FullyAsynchronous
>   - SemiAsynchronous
>
> I think there are two possibilities for what can happen
>   - There is a serialization inconsistency in the Serializers. If that is
> the case, this error should occur almost in a deterministic fashion. To
> debug that, would be good to know which data types you are using.
>   - There is a bug in RocksDB (or Flink's wrapping of it) where data gets
> corrupted when using the snapshot feature. That would explain why this only
> occurs when checkpoints are happening.
>
> Greetings,
> Stephan
>
>
> On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sc...@expedia.com> wrote:
>
>> It appears that when one of my jobs tries to checkpoint, the following
>> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB
>> checkpoints are being saved to S3.
>>
>> java.lang.RuntimeException: Error while adding data to RocksDB
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:125)
>>         at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:382)
>>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:176)
>>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:66)
>>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:266)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.EOFException: No more bytes left.
>>         at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.
>> require(NoFetchingInput.java:77)
>>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>         at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$Unsa
>> feLongField.read(UnsafeCacheFields.java:160)
>>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:
>> 761)
>>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:232)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:113)
>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.des
>> erialize(CaseClassSerializer.scala:30)
>>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState
>> .add(RocksDBFoldingState.java:118)
>>         ... 6 more
>>
>> Thanks for any help!
>>
>> Shannon
>>
>
>
>

Re: Error while adding data to RocksDB: No more bytes left

Posted by Stefan Richter <s....@data-artisans.com>.
Hi Shannon,

from your new stack trace and the bogus class names, I agree with Stephan that either serialization or the database itself is corrupted in some way. Could you please check if this problem only happens if checkpointing is enabled? If yes, does switching to fully async snapshots change the behavior?

Best,
Stefan

> Am 29.09.2016 um 21:12 schrieb Shannon Carey <sc...@expedia.com>:
> 
> Hi Stephan!
> 
> The failure appeared to occur every 10 minutes, which is also the interval for checkpointing. However, I agree with you that the stack trace appears to be independent. Could this perhaps be an issue with multithreading, where the checkpoint mechanism is somehow interfering with ongoing operation of the state backend? I've never seen this problem until now, so I am a little suspicious that it might be due to something in my code, but so far it's been difficult to figure out what that might be.
> 
> I am using the default, SemiAsync snapshot mode.
> 
> The classes of the data flow are a bit too large to put here in their entirety. We are using Scala case classes, Java classes generated by Avro, Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of these classes have been operational in other jobs before. I added a unit test for the class which contains a mutable.Map to see whether that was causing a problem. Does this look like a reasonable unit test to verify Flink serializability to you?
> it("roundtrip serializes in Flink") {
>   val millis: Long = TimeUnit.DAYS.toMillis(2)
>   val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
>   original.add("a", TimestampedAirportCount(4, 6))
>   original.add("b", TimestampedAirportCount(7, 8))
> 
>   val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100)
> 
>   deserialized.timestamp shouldBe millis
>   deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
>   deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
> }
> 
> def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = {
>   val typeInfo = implicitly[TypeInformation[T]]
>   val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig)
> 
>   val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
>   val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
>   serializer.serialize(original, outputView)
> 
>   out.size() should be <= expectedMaxBytes
> 
>   val inputView: DataInputViewStreamWrapper =
>     new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
>   val deserialized: T = serializer.deserialize(inputView)
> 
>   deserialized
> }
> I tried running my job in a local one-slot cluster with RocksDB enabled but checkpointing to local filesystem. Similar errors occur, but are more sporadic. I have not yet been able to capture the error while debugging, but if I do I will provide additional information.
> 
> I noticed that locally, execution only reaches DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint completes. Also, the timing of checkpointing a bit odd: in the example below the checkpoint takes 200s to complete after being triggered even though RocksDB reports that it only took ~100ms.
> 
> 2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering checkpoint 2 @ 1475171777619
> 2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db) backup (synchronous part) took 7 ms.
> 2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization from /var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous part) took 96 ms.
> 2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed checkpoint 2 (in 200621 ms)
> 
> Do you have any other advice?
> 
> Exceptions from local execution:
> 
> java.lang.RuntimeException: Error while adding data to RocksDB
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 'CLE
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: 'CLE
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> ... 16 more
> 
> After that one happened, this one happened many times:
> 
> java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
> at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -2
> at java.lang.String.<init>(String.java:196)
> at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:132)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
> ... 44 more
> 
> During another execution, this one occurred several times:
> 
> java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unable to deserialize default value.
> at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.ArrayList.readObject(ArrayList.java:791)
> at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.util.HashMap.readObject(HashMap.java:1396)
> at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
> at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
> ... 1 more
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: #
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
> ... 45 more
> Caused by: java.lang.ClassNotFoundException: #
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> ... 52 more
> 
> 
> 
> From: Stephan Ewen <sewen@apache.org <ma...@apache.org>>
> Date: Wednesday, September 28, 2016 at 1:18 PM
> To: <user@flink.apache.org <ma...@flink.apache.org>>
> Subject: Re: Error while adding data to RocksDB: No more bytes left
> 
> Hi Shannon!
> 
> The stack trace you pasted is independent of checkpointing - it seems to be from the regular processing. Does this only happen when checkpoints are activated?
> 
> Can you also share which checkpoint method you use?
>   - FullyAsynchronous
>   - SemiAsynchronous
> 
> I think there are two possibilities for what can happen
>   - There is a serialization inconsistency in the Serializers. If that is the case, this error should occur almost in a deterministic fashion. To debug that, would be good to know which data types you are using.
>   - There is a bug in RocksDB (or Flink's wrapping of it) where data gets corrupted when using the snapshot feature. That would explain why this only occurs when checkpoints are happening.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <scarey@expedia.com <ma...@expedia.com>> wrote:
> It appears that when one of my jobs tries to checkpoint, the following exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB checkpoints are being saved to S3.
> 
> java.lang.RuntimeException: Error while adding data to RocksDB
>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
>         at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
>         at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: No more bytes left.
>         at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
>         at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.Input.readVarLong(Input.java:690)
>         at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.Input.readLong(Input.java:685)
>         at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>         at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
>         ... 6 more
> 
> Thanks for any help!
> 
> Shannon
> 


Re: Error while adding data to RocksDB: No more bytes left

Posted by Shannon Carey <sc...@expedia.com>.
Hi Stephan!

The failure appeared to occur every 10 minutes, which is also the interval for checkpointing. However, I agree with you that the stack trace appears to be independent. Could this perhaps be an issue with multithreading, where the checkpoint mechanism is somehow interfering with ongoing operation of the state backend? I've never seen this problem until now, so I am a little suspicious that it might be due to something in my code, but so far it's been difficult to figure out what that might be.

I am using the default, SemiAsync snapshot mode.

The classes of the data flow are a bit too large to put here in their entirety. We are using Scala case classes, Java classes generated by Avro, Tuples, Scala Option, java.util.UUID and Scala mutable.Map. The majority of these classes have been operational in other jobs before. I added a unit test for the class which contains a mutable.Map to see whether that was causing a problem. Does this look like a reasonable unit test to verify Flink serializability to you?

it("roundtrip serializes in Flink") {
  val millis: Long = TimeUnit.DAYS.toMillis(2)
  val original: PreferredAirportDailySum = new PreferredAirportDailySum(millis)
  original.add("a", TimestampedAirportCount(4, 6))
  original.add("b", TimestampedAirportCount(7, 8))

  val deserialized: PreferredAirportDailySum = serializationRoundTrip(original, 100)

  deserialized.timestamp shouldBe millis
  deserialized.immutableItems("a") shouldBe TimestampedAirportCount(4, 6)
  deserialized.immutableItems("b") shouldBe TimestampedAirportCount(7, 8)
}

def serializationRoundTrip[T : ClassTag : TypeInformation](original: T, expectedMaxBytes: Int): T = {
  val typeInfo = implicitly[TypeInformation[T]]
  val serializer: TypeSerializer[T] = typeInfo.createSerializer(new ExecutionConfig)

  val out: ByteArrayOutputStream = new ByteArrayOutputStream(expectedMaxBytes)
  val outputView: DataOutputView = new DataOutputViewStreamWrapper(out)
  serializer.serialize(original, outputView)

  out.size() should be <= expectedMaxBytes

  val inputView: DataInputViewStreamWrapper =
    new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))
  val deserialized: T = serializer.deserialize(inputView)

  deserialized
}

I tried running my job in a local one-slot cluster with RocksDB enabled but checkpointing to local filesystem. Similar errors occur, but are more sporadic. I have not yet been able to capture the error while debugging, but if I do I will provide additional information.

I noticed that locally, execution only reaches DefaultClassResolver#readName(Input)->Class.forName() when a checkpoint completes. Also, the timing of checkpointing a bit odd: in the example below the checkpoint takes 200s to complete after being triggered even though RocksDB reports that it only took ~100ms.

2016-09-29 12:56:17,619 INFO  CheckpointCoordinator     - Triggering checkpoint 2 @ 1475171777619
2016-09-29 12:59:38,079 INFO  RocksDBStateBackend  - RocksDB (/var/folders/…./WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/db) backup (synchronous part) took 7 ms.
2016-09-29 12:59:38,214 INFO  RocksDBStateBackend  - RocksDB materialization from /var/folders/…/WindowOperator_38_0/dummy_state/730773a2-bb33-4021-aa9e-9b4e3cb172f3/local-chk-2 to file:/var/flinkstate/…/WindowOperator_38_0/dummy_state/chk-2 (asynchronous part) took 96 ms.
2016-09-29 12:59:38,333 INFO  CheckpointCoordinator     - Completed checkpoint 2 (in 200621 ms)

Do you have any other advice?

Exceptions from local execution:

java.lang.RuntimeException: Error while adding data to RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 'CLE
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
... 6 more
Caused by: java.lang.ClassNotFoundException: 'CLE
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 16 more

After that one happened, this one happened many times:

java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to deserialize default value.
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
... 1 more
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -2
at java.lang.String.<init>(String.java:196)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:466)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:132)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
... 44 more

During another execution, this one occurred several times:

java.lang.RuntimeException: Failed to deserialize state handle and setup initial operator state.
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Unable to deserialize default value.
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:285)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.ArrayList.readObject(ArrayList.java:791)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.util.HashMap.readObject(HashMap.java:1396)
at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: #
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:282)
... 45 more
Caused by: java.lang.ClassNotFoundException: #
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 52 more



From: Stephan Ewen <se...@apache.org>>
Date: Wednesday, September 28, 2016 at 1:18 PM
To: <us...@flink.apache.org>>
Subject: Re: Error while adding data to RocksDB: No more bytes left

Hi Shannon!

The stack trace you pasted is independent of checkpointing - it seems to be from the regular processing. Does this only happen when checkpoints are activated?

Can you also share which checkpoint method you use?
  - FullyAsynchronous
  - SemiAsynchronous

I think there are two possibilities for what can happen
  - There is a serialization inconsistency in the Serializers. If that is the case, this error should occur almost in a deterministic fashion. To debug that, would be good to know which data types you are using.
  - There is a bug in RocksDB (or Flink's wrapping of it) where data gets corrupted when using the snapshot feature. That would explain why this only occurs when checkpoints are happening.

Greetings,
Stephan


On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sc...@expedia.com>> wrote:
It appears that when one of my jobs tries to checkpoint, the following exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB checkpoints are being saved to S3.

java.lang.RuntimeException: Error while adding data to RocksDB
        at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:125)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
        at org.apache.flink.streaming.runtime.io<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException: No more bytes left.
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
        at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarLong(Input.java:690)
        at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readLong(Input.java:685)
        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
        at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
        at org.apache.flink.contrib.streaming.state.RocksDBFoldingState.add(RocksDBFoldingState.java:118)
        ... 6 more

Thanks for any help!

Shannon


Re: Error while adding data to RocksDB: No more bytes left

Posted by Stephan Ewen <se...@apache.org>.
Hi Shannon!

The stack trace you pasted is independent of checkpointing - it seems to be
from the regular processing. Does this only happen when checkpoints are
activated?

Can you also share which checkpoint method you use?
  - FullyAsynchronous
  - SemiAsynchronous

I think there are two possibilities for what can happen
  - There is a serialization inconsistency in the Serializers. If that is
the case, this error should occur almost in a deterministic fashion. To
debug that, would be good to know which data types you are using.
  - There is a bug in RocksDB (or Flink's wrapping of it) where data gets
corrupted when using the snapshot feature. That would explain why this only
occurs when checkpoints are happening.

Greetings,
Stephan


On Wed, Sep 28, 2016 at 7:28 PM, Shannon Carey <sc...@expedia.com> wrote:

> It appears that when one of my jobs tries to checkpoint, the following
> exception is triggered. I am using Flink 1.1.1 in Scala 2.11. RocksDB
> checkpoints are being saved to S3.
>
> java.lang.RuntimeException: Error while adding data to RocksDB
>         at org.apache.flink.contrib.streaming.state.
> RocksDBFoldingState.add(RocksDBFoldingState.java:125)
>         at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:382)
>         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:176)
>         at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:66)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:266)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: No more bytes left.
>         at org.apache.flink.api.java.typeutils.runtime.
> NoFetchingInput.require(NoFetchingInput.java:77)
>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>         at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$
> UnsafeLongField.read(UnsafeCacheFields.java:160)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
> java:761)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> deserialize(CaseClassSerializer.scala:113)
>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
> deserialize(CaseClassSerializer.scala:30)
>         at org.apache.flink.contrib.streaming.state.
> RocksDBFoldingState.add(RocksDBFoldingState.java:118)
>         ... 6 more
>
> Thanks for any help!
>
> Shannon
>