You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/24 00:35:00 UTC

[jira] [Commented] (KAFKA-7534) Error during CachingKeyValueStore.flush may not allow RocksDB to close

    [ https://issues.apache.org/jira/browse/KAFKA-7534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661522#comment-16661522 ] 

ASF GitHub Bot commented on KAFKA-7534:
---------------------------------------

bbejeck opened a new pull request #5833: KAFKA-7534: Error in flush calling close may prevent underlying store  from closing
URL: https://github.com/apache/kafka/pull/5833
 
 
   Calling the `CachingKeyValueStore#close()` method first calls `CachingKeyValueStore.flush()`.  If there is an exception thrown during the `flush` call, the underlying store is not closed.  Subsequently, another task can't open the RocksDB store and receives a `No locks available` exception.
   
   I added a unit test that fails with the proposed fix.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Error during CachingKeyValueStore.flush may not allow RocksDB to close
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7534
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>
> @Override
> public void flush() {
>     lock.writeLock().lock();
>     try {
>         cache.flush(cacheName);
>         underlying.flush();
>     } finally {
>         lock.writeLock().unlock();
>     }
> }
> @Override
> public void close() {
>     flush();
>     underlying.close();
>     cache.close(cacheName);
> An exception leading to this, notice that another store is already closed
> and therefore not available:
> 2018-10-04 12:18:44,961 ERROR
> [org.apache.kafka.streams.processor.internals.ProcessorStateManager]
> (...-StreamThread-8) - task [8_11] Failed to close state store
> ...-STATE-STORE-0000000038: :
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store
> KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> If the store is not closed we have witnessed that the lock is RocksDB is
> not removed properly which can lead to
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
> by: org.rocksdb.RocksDBException: While lock file:
> ...-STATE-STORE-0000000038/LOCK: No locks available
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
> org.rocksdb.RocksDB.open(Native Method)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)