You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Patrik Kleindl <pk...@gmail.com> on 2018/10/23 13:24:33 UTC

RocksDB not closed on error during CachingKeyValueStore.flush?

Hello

Can someone please verify if my assumption is correct?
In CachingKeyValueStore, if an exception happens during flush() the store
will not be closed properly.

@Override
public void flush() {
    lock.writeLock().lock();
    try {
        cache.flush(cacheName);
        underlying.flush();
    } finally {
        lock.writeLock().unlock();
    }
}

@Override
public void close() {
    flush();
    underlying.close();
    cache.close(cacheName);

An exception leading to this, notice that another store is already closed
and therefore not available:
2018-10-04 12:18:44,961 ERROR
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(...-StreamThread-8) - task [8_11] Failed to close state store
...-STATE-STORE-0000000038: :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store
KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
at
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

If the store is not closed we have witnessed that the lock is RocksDB is
not removed properly which can lead to

2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
by: org.rocksdb.RocksDBException: While lock file:
...-STATE-STORE-0000000038/LOCK: No locks available
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -  	at
org.rocksdb.RocksDB.open(Native Method)

best regards

Patrik

Re: RocksDB not closed on error during CachingKeyValueStore.flush?

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Patrik,

Thanks for reporting this, it does indeed look like this could be an issue.

I've created https://issues.apache.org/jira/browse/KAFKA-7534 to look into
the issue and if required create a patch.

-Bill

On Tue, Oct 23, 2018 at 9:24 AM Patrik Kleindl <pk...@gmail.com> wrote:

> Hello
>
> Can someone please verify if my assumption is correct?
> In CachingKeyValueStore, if an exception happens during flush() the store
> will not be closed properly.
>
> @Override
> public void flush() {
>     lock.writeLock().lock();
>     try {
>         cache.flush(cacheName);
>         underlying.flush();
>     } finally {
>         lock.writeLock().unlock();
>     }
> }
>
> @Override
> public void close() {
>     flush();
>     underlying.close();
>     cache.close(cacheName);
>
> An exception leading to this, notice that another store is already closed
> and therefore not available:
> 2018-10-04 12:18:44,961 ERROR
> [org.apache.kafka.streams.processor.internals.ProcessorStateManager]
> (...-StreamThread-8) - task [8_11] Failed to close state store
> ...-STATE-STORE-0000000038: :
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store
> KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
> at
>
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at
>
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
> at
>
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
> at
>
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
> at
>
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
> at
>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
> at
>
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> at
>
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
> at
>
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
>
> If the store is not closed we have witnessed that the lock is RocksDB is
> not removed properly which can lead to
>
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
> by: org.rocksdb.RocksDBException: While lock file:
> ...-STATE-STORE-0000000038/LOCK: No locks available
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
> org.rocksdb.RocksDB.open(Native Method)
>
> best regards
>
> Patrik
>