You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Bejeck (JIRA)" <ji...@apache.org> on 2018/10/23 14:59:00 UTC

[jira] [Created] (KAFKA-7534) Error during CachingKeyValueStore.flush may not allow RocksDB to close

Bill Bejeck created KAFKA-7534:
----------------------------------

             Summary: Error during CachingKeyValueStore.flush may not allow RocksDB to close
                 Key: KAFKA-7534
                 URL: https://issues.apache.org/jira/browse/KAFKA-7534
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Bill Bejeck
            Assignee: Bill Bejeck


@Override
public void flush() {
    lock.writeLock().lock();
    try {
        cache.flush(cacheName);
        underlying.flush();
    } finally {
        lock.writeLock().unlock();
    }
}

@Override
public void close() {
    flush();
    underlying.close();
    cache.close(cacheName);

An exception leading to this, notice that another store is already closed
and therefore not available:
2018-10-04 12:18:44,961 ERROR
[org.apache.kafka.streams.processor.internals.ProcessorStateManager]
(...-StreamThread-8) - task [8_11] Failed to close state store
...-STATE-STORE-0000000038: :
org.apache.kafka.streams.errors.InvalidStateStoreException: Store
KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
at
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
at
org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
at
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
at
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
at
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

If the store is not closed we have witnessed that the lock is RocksDB is
not removed properly which can lead to

2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
by: org.rocksdb.RocksDBException: While lock file:
...-STATE-STORE-0000000038/LOCK: No locks available
2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
org.rocksdb.RocksDB.open(Native Method)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)