You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2020/04/18 18:44:00 UTC

[jira] [Commented] (KAFKA-9880) Error while range compacting during bulk loading of FIFO compacted RocksDB Store

    [ https://issues.apache.org/jira/browse/KAFKA-9880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086589#comment-17086589 ] 

Guozhang Wang commented on KAFKA-9880:
--------------------------------------

Thanks for reporting this [~Dealus]. I think an appropriate solution would be, inside `toggleDbForBulkLoading` we should not set targetLevel to 1 but use the other overloaded `compactRange` which set it to -1.

WDYT [~cadonna] [~mjsax]?

> Error while range compacting during bulk loading of FIFO compacted RocksDB Store
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-9880
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9880
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1
>            Reporter: Nicolas Carlot
>            Priority: Major
>
>  
> When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown:
>  
> {code:java}
> //
> org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring  store merge_store
>         at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?]
> Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels
>         at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?]
>         at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?]
>         at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?]
>         ... 11 more
> {code}
>  
>  
> Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove:
> {code:java}
> // 
> CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> fifoOptions.setMaxTableFilesSize(maxSize);
> fifoOptions.setAllowCompaction(true);
> options.setCompactionOptionsFIFO(fifoOptions);
> options.setCompactionStyle(CompactionStyle.FIFO);
> {code}
>  
>  
> Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased.
> I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around:
>  
> {code:java}
> //
> public void toggleDbForBulkLoading() {
>       try {
>         db.compactRange(columnFamily, true, 1, 0);
>       } catch (final RocksDBException e) {
>         try {
>           if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) {
>             throw new ProcessorStateException("Error while range compacting during restoring  store " + name, e);
>           }
>           else {
>             log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e);
>           }
>         } catch (RocksDBException e1) {
>           throw new ProcessorStateException("Error while range compacting during restoring  store " + name, e);
>         }
>       }
>     }
> {code}
>  
> I'm not very proud of this workaround, but it suits my use cases well.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)