You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Stefan Hoffmeister <st...@econos.de> on 2019/10/08 14:53:45 UTC

Corrupted RocksDB KeyValueStore.get() returns null?

Hello everybody,

we seem to experience very unexpected behaviour from the Kafka Streams 2.2.0 framework, where KeyValueState.get() returns a value of null, although it really should return some other value.

This seems to be content corruption somewhere at / around the cache layer in Kafka Streams, with RocksDB delivering the persistent cache.

On the implementation side, we simply

    streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("my-topic"),

then retrieve and cache the store in init() of a Transformer, then use the store in transform().

We see that *very rarely* KeyValueStore.get() returns null, although the (changelog) topic (on the Kafka broker) has clear evidence that KeyValueStore.get() should have returned a value.

This was seen at exactly the same time for at least two different topics, upon starting the streaming application,

a) (changelog) topic with exactly one partition, guaranteed to contain exactly one key - had reliably returned data for four weeks

b) (changelog) topic over 16 partitions, where for about 30 calls to get(), a total of 20 returned data as expected, and about 10 returned an incorrect value of null.

Has anyone seen something like that before, any pointers to root cause?

The following items have made it onto our review agenda:

a) virtual network instability, possibly leading to interesting RocksDB startup behaviour (org.apache.kafka.streams.processor.internals.StoreChangelogReader#initialize)

b) on-disk data corruption (Linux -> NFS as persistent cache storage for RocksDB)


Any insight or pointers greatly appreciated!

Many thanks!

Stefan

Re: Corrupted RocksDB KeyValueStore.get() returns null?

Posted by Stefan Hoffmeister <st...@gmail.com>.

On 2019/10/11 05:25:04, "Matthias J. Sax" <ma...@confluent.io> wrote: 
> Hard to say.
> 
> Are you sure you query the correct instance? 

Yes, absolutely.

> As you have 16 partitions,
> you need to ensure that the instance you query actually hosts the key.
> Or do you have only one instance running?

We can guarantee that only one instance is running.

> How do you know that there should be a value for the key?

Business logic and awareness of the data that flowed allows us to reason about the state of two different topics:

- the "guaranteed single key" state store is a highwater mark on a single partition which we only _write_ under well-defined circumstances. The changelog topic contains our highwater mark value *twice*. This is only possible if the .get() returned null before. (Plus we see log entries in the external system to that effect)

- the multi-partition changelog topic is a duplication detection (state store). By way of construction of the writing transformer, this changelog can only contain one record per key. In our incident, for some keys, we saw two records in the changelog. This is only possible with get() returning null for those keys, although the key was present before.

All this was in a scenario where the RocksDB data had to be physically restored (because the stream subtopology order changed, hence tasks changed, hence the original data on Linux NFS disk "disappeared")

We are lucky in that we have only have very few records that we need to look at. Compaction muddies the water a little bit, but does not prevent analysis.

FWIW, there is a parallel conversation at https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1570604812005000 - there I have noted a hunch / working hypothesis that streams processing started, although RocksDB state restore had not _fully_ completed (yet), possibly due to an "unhappy" network.

All the data in the two changelog topics under review supports that hypothesis.

> -Matthias
> 
> 
> On 10/8/19 7:53 AM, Stefan Hoffmeister wrote:
> > Hello everybody,
> > 
> > we seem to experience very unexpected behaviour from the Kafka Streams 2.2.0 framework, where KeyValueState.get() returns a value of null, although it really should return some other value.
> > 
> > This seems to be content corruption somewhere at / around the cache layer in Kafka Streams, with RocksDB delivering the persistent cache.
> > 
> > On the implementation side, we simply
> > 
> >     streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("my-topic"),
> > 
> > then retrieve and cache the store in init() of a Transformer, then use the store in transform().
> > 
> > We see that *very rarely* KeyValueStore.get() returns null, although the (changelog) topic (on the Kafka broker) has clear evidence that KeyValueStore.get() should have returned a value.
> > 
> > This was seen at exactly the same time for at least two different topics, upon starting the streaming application,
> > 
> > a) (changelog) topic with exactly one partition, guaranteed to contain exactly one key - had reliably returned data for four weeks
> > 
> > b) (changelog) topic over 16 partitions, where for about 30 calls to get(), a total of 20 returned data as expected, and about 10 returned an incorrect value of null.
> > 
> > Has anyone seen something like that before, any pointers to root cause?
> > 
> > The following items have made it onto our review agenda:
> > 
> > a) virtual network instability, possibly leading to interesting RocksDB startup behaviour (org.apache.kafka.streams.processor.internals.StoreChangelogReader#initialize)
> > 
> > b) on-disk data corruption (Linux -> NFS as persistent cache storage for RocksDB)
> > 
> > 
> > Any insight or pointers greatly appreciated!
> > 
> > Many thanks!
> > 
> > Stefan
> > 
> 
> 

Re: Corrupted RocksDB KeyValueStore.get() returns null?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hard to say.

Are you sure you query the correct instance? As you have 16 partitions,
you need to ensure that the instance you query actually hosts the key.
Or do you have only one instance running?

How do you know that there should be a value for the key? Note that
everything is async and continuously changing and thus it might hard to
reason about the system state?


-Matthias


On 10/8/19 7:53 AM, Stefan Hoffmeister wrote:
> Hello everybody,
> 
> we seem to experience very unexpected behaviour from the Kafka Streams 2.2.0 framework, where KeyValueState.get() returns a value of null, although it really should return some other value.
> 
> This seems to be content corruption somewhere at / around the cache layer in Kafka Streams, with RocksDB delivering the persistent cache.
> 
> On the implementation side, we simply
> 
>     streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("my-topic"),
> 
> then retrieve and cache the store in init() of a Transformer, then use the store in transform().
> 
> We see that *very rarely* KeyValueStore.get() returns null, although the (changelog) topic (on the Kafka broker) has clear evidence that KeyValueStore.get() should have returned a value.
> 
> This was seen at exactly the same time for at least two different topics, upon starting the streaming application,
> 
> a) (changelog) topic with exactly one partition, guaranteed to contain exactly one key - had reliably returned data for four weeks
> 
> b) (changelog) topic over 16 partitions, where for about 30 calls to get(), a total of 20 returned data as expected, and about 10 returned an incorrect value of null.
> 
> Has anyone seen something like that before, any pointers to root cause?
> 
> The following items have made it onto our review agenda:
> 
> a) virtual network instability, possibly leading to interesting RocksDB startup behaviour (org.apache.kafka.streams.processor.internals.StoreChangelogReader#initialize)
> 
> b) on-disk data corruption (Linux -> NFS as persistent cache storage for RocksDB)
> 
> 
> Any insight or pointers greatly appreciated!
> 
> Many thanks!
> 
> Stefan
>