You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/20 09:08:54 UTC

[GitHub] [kafka] cadonna opened a new pull request #10568: KAFKA-8897: Upgrade RocksDB

cadonna opened a new pull request #10568:
URL: https://github.com/apache/kafka/pull/10568


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892231916


   @codefactor it will be released in 3.0, which is currently in the final stages of testing and preparing for release. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626249438



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -99,35 +104,7 @@ public Env getEnv() {
 
     @Override
     public Options prepareForBulkLoad() {
-        /* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
-         *
-         * Q: What's the fastest way to load data into RocksDB?
-         *
-         * A: A fast way to direct insert data to the DB:
-         *
-         *  1. using single writer thread and insert in sorted order
-         *  2. batch hundreds of keys into one write batch
-         *  3. use vector memtable
-         *  4. make sure options.max_background_flushes is at least 4
-         *  5. before inserting the data,
-         *       disable automatic compaction,
-         *       set options.level0_file_num_compaction_trigger,
-         *           options.level0_slowdown_writes_trigger
-         *           and options.level0_stop_writes_trigger to very large.
-         *     After inserting all the data, issue a manual compaction.
-         *
-         * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option
-         */
-        // (1) not in our control
-        // (2) is done via bulk-loading API
-        // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-        //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-        // (4-5) below:
-        dbOptions.setMaxBackgroundFlushes(4);
-        columnFamilyOptions.setDisableAutoCompactions(true);
-        columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-        columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-        columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
       I left a comment on the old PR as well --- I vaguely remember that in new versions this is done inside rocksDB hence we do not need it, but maybe @cadonna can confirm here. And if yes, we could add a one line comment for future readers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626056309



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##########
@@ -411,13 +408,25 @@ private StateConsumer initialize() {
 
             return stateConsumer;
         } catch (final StreamsException fatalException) {
+            closeStateConsumer(stateConsumer, false);

Review comment:
       Ah, so we were also possibly leaking stores in case of a StreamsException or other Exception? Yikes

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Yikes, so it actually was a (possible) leak in production code? I'm almost glad rocksdb had a bug, otherwise we might have never caught it 😅 
   
   Definite +1 on consolidating these, can you file a quick ticket for this? It almost sounds like a bug in itself that one might contain stores that the other does not, but I suppose as long as they all get closed it's probably ok. 
   
   I'd bet you could find a community member to pick it up by labeling as `newbie` or `newbie++`

##########
File path: gradle/dependencies.gradle
##########
@@ -103,7 +103,7 @@ versions += [
   netty: "4.1.62.Final",
   powermock: "2.0.9",
   reflections: "0.9.12",
-  rocksDB: "5.18.4",
+  rocksDB: "6.16.4",

Review comment:
       🥳 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -374,7 +374,7 @@ public void flush() {
 
     @Override
     public void close() {
-        if (globalStores.isEmpty()) {
+        if (globalStateStores.isEmpty() && globalStores.isEmpty()) {

Review comment:
       What even is the difference between `globalStateStores` and `globalStores`? I'm not surprised we had a leak, I'm pretty sure if I was looking at this code I wouldn't remember that these were actually two different variables. Maybe we can give them slightly more descriptive names in the meantime?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -99,35 +104,7 @@ public Env getEnv() {
 
     @Override
     public Options prepareForBulkLoad() {
-        /* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
-         *
-         * Q: What's the fastest way to load data into RocksDB?
-         *
-         * A: A fast way to direct insert data to the DB:
-         *
-         *  1. using single writer thread and insert in sorted order
-         *  2. batch hundreds of keys into one write batch
-         *  3. use vector memtable
-         *  4. make sure options.max_background_flushes is at least 4
-         *  5. before inserting the data,
-         *       disable automatic compaction,
-         *       set options.level0_file_num_compaction_trigger,
-         *           options.level0_slowdown_writes_trigger
-         *           and options.level0_stop_writes_trigger to very large.
-         *     After inserting all the data, issue a manual compaction.
-         *
-         * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option
-         */
-        // (1) not in our control
-        // (2) is done via bulk-loading API
-        // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-        //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-        // (4-5) below:
-        dbOptions.setMaxBackgroundFlushes(4);
-        columnFamilyOptions.setDisableAutoCompactions(true);
-        columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-        columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-        columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
       I seem to remember there was some reason we didn't remove this when we disabled bulk loading...but I don't remember what that was. @guozhangwang do you?
   
   Or were these options just removed in 6.x

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {

Review comment:
       > Use this if your DB is very small (like under 1GB) and you don't want to spend lots of memory for memtables.
   
   This looks interesting 🤔  Could be very useful for some users like ksqlDB, where many applications have small state stores cc @rodesai 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       Should we disable/throw/ignore this, since we disable the WAL? Same for the other `wal` options

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {
+        dbOptions.setAvoidUnnecessaryBlockingIO(avoidUnnecessaryBlockingIO);
+        return this;
+    }
+
+    @Override
+    public boolean avoidUnnecessaryBlockingIO() {
+        return dbOptions.avoidUnnecessaryBlockingIO();
+    }
+
+    @Override
+    public Options setPersistStatsToDisk(final boolean persistStatsToDisk) {
+        dbOptions.setPersistStatsToDisk(persistStatsToDisk);
+        return this;
+    }
+
+    @Override
+    public boolean persistStatsToDisk() {
+        return dbOptions.persistStatsToDisk();
+    }
+
+    @Override
+    public Options setWriteDbidToManifest(final boolean writeDbidToManifest) {
+        dbOptions.setWriteDbidToManifest(writeDbidToManifest);
+        return this;
+    }
+
+    @Override
+    public boolean writeDbidToManifest() {
+        return dbOptions.writeDbidToManifest();
+    }
+
+    @Override
+    public Options setLogReadaheadSize(final long logReadaheadSize) {
+        dbOptions.setLogReadaheadSize(logReadaheadSize);
+        return this;
+    }
+
+    @Override
+    public long logReadaheadSize() {
+        return dbOptions.logReadaheadSize();
+    }
+
+    @Override
+    public Options setBestEffortsRecovery(final boolean bestEffortsRecovery) {
+        dbOptions.setBestEffortsRecovery(bestEffortsRecovery);
+        return this;
+    }
+
+    @Override
+    public boolean bestEffortsRecovery() {
+        return dbOptions.bestEffortsRecovery();
+    }
+
+    @Override
+    public Options setMaxBgErrorResumeCount(final int maxBgerrorResumeCount) {
+        dbOptions.setMaxBgErrorResumeCount(maxBgerrorResumeCount);
+        return this;
+    }
+
+    @Override
+    public int maxBgerrorResumeCount() {
+        return dbOptions.maxBgerrorResumeCount();
+    }
+
+    @Override
+    public Options setBgerrorResumeRetryInterval(final long bgerrorResumeRetryInterval) {
+        dbOptions.setBgerrorResumeRetryInterval(bgerrorResumeRetryInterval);
+        return this;
+    }
+
+    @Override
+    public long bgerrorResumeRetryInterval() {
+        return dbOptions.bgerrorResumeRetryInterval();
+    }
+
+    @Override
+    public Options setSstPartitionerFactory(final SstPartitionerFactory sstPartitionerFactory) {
+        columnFamilyOptions.setSstPartitionerFactory(sstPartitionerFactory);
+        return this;
+    }
+
+    @Override
+    public SstPartitionerFactory sstPartitionerFactory() {
+        return columnFamilyOptions.sstPartitionerFactory();
+    }
+
+    @Override
+    public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {

Review comment:
       Another interesting config, as my impression has been that background compactions are one of the biggest limiting factors in scaling up the state stores and/or partitions. But I'm not yet sure what the difference between this and just limiting the number of compaction threads is..

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {

Review comment:
       this also seems quite interesting...possibly something we should enable by default? I filed [KAFKA-12748](https://issues.apache.org/jira/browse/KAFKA-12748) to explore some of these new options and run benchmarks to determine which should be configured by default

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Does this still require setting a specific compaction style (I think it was the FIFO style)? If so maybe we should enforce that in some way, or else at least log a warning that they are incompatible. Of course I'm not quite sure what would happen if you tried to set the ttl without the FIFO compaction, maybe RocksDB would throw an exception upon opening, or just silently ignore the ttl, or... who knows. Might be worth a quick test to find out

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Looking through the code in this class,  this feels a bit weird. Seems like the only way we could have (open) stores in `globalStateStores` but not in `globalStores` is when we hit an exception while trying to close the store in `globalStores`. I'm all for retrying in theory (though I'm willing to bet 99% of the time it's either permanent/fatal) but if that's what we intend to do, we should just retry it right after we hit that exception so it's clear what we're doing. Looping through this other structure like this feels very odd, as it implies these are distinct sets of StateStores instead of two data structures which should reflect the same set of underlying StateStores




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833427130


   The system tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4494/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833427130


   The system tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4500/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832443804


   Apparently there are still leaks since we get `pure virtual method called` in the JDK 8 builds. I did not get them locally. I will investigate further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833790689


   The test failures in the build are unrelated and known to be flaky.
   The system test failures are unrelated since only Streams-broker compatibility tests failed.
   @guozhangwang @ableegoldman I think this PR is good to go.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833674556


   @cadonna I've retriggered https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4501


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626510307



##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -383,8 +386,7 @@ public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest(
     }
 
     private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final String builtInMetricsVersion) {
-        final InternalMockProcessorContext context = createInternalMockProcessorContext(builtInMetricsVersion);
-        processor.init(context);
+        setup(builtInMetricsVersion, true);

Review comment:
       I do not know. Will set it to `false`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832548068


   I found a `pure virtual method called` in the ARM builds of the following unrelated PRs: https://github.com/apache/kafka/pull/9779 and https://github.com/apache/kafka/pull/10631
   Hence, that error in the ARM builds is not directly related to the RocksDB upgrade but was there before. I guess, the cause is one of the RocksDB leaks we have in our code that we will hopefully fix with this PR.
   It is a bit scary that we probably had this error in the ARM build before, but nobody cared about it, though.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r629163797



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       > In that case we may want to warn users that this option will not, in fact, provide "real" ttl (unless also using FIFO compaction). I'd imagine users may see this new option pop up and assume, as I did, that it meant deleting files older than ttl, ie the definition of ttl...
   
   If you mean to warn users with a log warning, I am not sure if we should log something because we assume that the expectations of the users are probably wrong.
   
   > Also, not totally sure I understand -- does that mean only files older than the ttl will be compacted, or just that if a file remains uncompacted for longer than the ttl it will be "prematurely" compacted? I assume the latter...
   
   I also do not totally understand, but I also think it is the latter. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r629538083



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Feels it is also the latter too.
   
   BTW I think the rocksDB's own javadocs are sufficient enough --- if users do want to set these options, they must be advanced enough to know what exactly rocksDB is doing.. as Streams, we just need to keep it in mind ourselves so if someone asked "why setting TTL does not work in Streams rocksDB" we would refer them to read this first :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r628815764



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       > Whatever, we decide I would like to have that in a separate PR. 
   
   Sounds good. If you don't have time to follow up right away, can you just file a quick ticket so we don't forget? 
   
   Maybe you are not as forgetful as I am, but I always worry any things like this, whether small or critically important, can be easily lost in the flood of other things we are all working on at any given time 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626175996



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Heh, @guozhangwang and I reviewed at the same time. I didn't notice that `globalStores` would not be populated until restoration, whereas `globalStateStores` is populated in the constructor. Imo we should just populate `globalStores` in the constructor as well, but I guess that won't be necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626501582



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       I am not sure since users could theoretically activate the WAL in the config setter. We do not overwrite user settings as far as I can see. That is consistent with the rest of Streams where the users can overwrite parameters set by the DSL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626160997



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Surprised to see we actually have two as well. Did a quick look at them, and I think they can be consolidated indeed. Some more details:
   
   1) The `mgr.initialize` would try to register all stores within `globalStateStores`, which would put them into the other `globalStores` map one by one via the `registerStateStore` call. So after the `initialize` call, the two stores should contain the same piece of metadata.
   
   2) But note that, before `initialize` call, no stores should be opened yet. So if a failure happens before that call, then all stores should be in `closed` form, and this logic would never be triggered.
   
   3) Within `initialize` we call `restoreState`, and only after that we would add the stores to the `globalStores` here. So if a failure happens during `restoreState`, the `globalStores` would not contain it while we have to rely on `globalStateStores`.
   
   Based on that, I can file a quick follow-up fix after your PR to consolidate these two.

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -383,8 +386,7 @@ public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest(
     }
 
     private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final String builtInMetricsVersion) {
-        final InternalMockProcessorContext context = createInternalMockProcessorContext(builtInMetricsVersion);
-        processor.init(context);
+        setup(builtInMetricsVersion, true);

Review comment:
       Could you remind me: why we need to enable caching here, but not in others below?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception {
         assertFalse(globalStore.isOpen());
     }
 
-    @Test
-    public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
       Why remove this test?

##########
File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
##########
@@ -242,6 +242,7 @@ private void shouldLogAndMeterWhenSkippingNullLeftKey(final String builtInMetric
         props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion);
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+            driver.close();

Review comment:
       Nice catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832838475


   The build fail due to known flaky tests but not due to SIGABRT. 🥳 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r627221015



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       From the comment above the TTL option under Level compaction will not delete entries if they exceed the time but it only compacts them. If I remember correctly, the ask was to delete entries if they exceed the time limit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833425461


   The ARM build passed.
   Now, I rebased the PR and I will run Streams system tests on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832855351


   I'm re-triggering the unit tests again, @cadonna lmk if you think one green is sufficient (i.e. if in the past we are likely to hit one virtual function with one run).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626971805



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {
+        dbOptions.setAvoidUnnecessaryBlockingIO(avoidUnnecessaryBlockingIO);
+        return this;
+    }
+
+    @Override
+    public boolean avoidUnnecessaryBlockingIO() {
+        return dbOptions.avoidUnnecessaryBlockingIO();
+    }
+
+    @Override
+    public Options setPersistStatsToDisk(final boolean persistStatsToDisk) {
+        dbOptions.setPersistStatsToDisk(persistStatsToDisk);
+        return this;
+    }
+
+    @Override
+    public boolean persistStatsToDisk() {
+        return dbOptions.persistStatsToDisk();
+    }
+
+    @Override
+    public Options setWriteDbidToManifest(final boolean writeDbidToManifest) {
+        dbOptions.setWriteDbidToManifest(writeDbidToManifest);
+        return this;
+    }
+
+    @Override
+    public boolean writeDbidToManifest() {
+        return dbOptions.writeDbidToManifest();
+    }
+
+    @Override
+    public Options setLogReadaheadSize(final long logReadaheadSize) {
+        dbOptions.setLogReadaheadSize(logReadaheadSize);
+        return this;
+    }
+
+    @Override
+    public long logReadaheadSize() {
+        return dbOptions.logReadaheadSize();
+    }
+
+    @Override
+    public Options setBestEffortsRecovery(final boolean bestEffortsRecovery) {
+        dbOptions.setBestEffortsRecovery(bestEffortsRecovery);
+        return this;
+    }
+
+    @Override
+    public boolean bestEffortsRecovery() {
+        return dbOptions.bestEffortsRecovery();
+    }
+
+    @Override
+    public Options setMaxBgErrorResumeCount(final int maxBgerrorResumeCount) {
+        dbOptions.setMaxBgErrorResumeCount(maxBgerrorResumeCount);
+        return this;
+    }
+
+    @Override
+    public int maxBgerrorResumeCount() {
+        return dbOptions.maxBgerrorResumeCount();
+    }
+
+    @Override
+    public Options setBgerrorResumeRetryInterval(final long bgerrorResumeRetryInterval) {
+        dbOptions.setBgerrorResumeRetryInterval(bgerrorResumeRetryInterval);
+        return this;
+    }
+
+    @Override
+    public long bgerrorResumeRetryInterval() {
+        return dbOptions.bgerrorResumeRetryInterval();
+    }
+
+    @Override
+    public Options setSstPartitionerFactory(final SstPartitionerFactory sstPartitionerFactory) {
+        columnFamilyOptions.setSstPartitionerFactory(sstPartitionerFactory);
+        return this;
+    }
+
+    @Override
+    public SstPartitionerFactory sstPartitionerFactory() {
+        return columnFamilyOptions.sstPartitionerFactory();
+    }
+
+    @Override
+    public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {

Review comment:
       Huh. I wonder why you would want to limit the number of outstanding compactions -- and what would happen if you hit that limit. Would rocksdb stall writes until it could clear some of the queued compactions? 🤷‍♀️ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626176038



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Heh, @guozhangwang and I reviewed at the same time. I didn't notice that `globalStores` would not be populated until restoration, whereas `globalStateStores` is populated in the constructor. Imo we should just populate `globalStores` in the constructor as well, but I guess that won't be necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10568:
URL: https://github.com/apache/kafka/pull/10568


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833358757


   I checked the last 10 PRs that where opened for the AK repo and 7 of them have `pure virtual method called` in the ARM build around the tests that showed the last leak I fixed. 1 of the ARM builds that did not contain `pure virtual method called` failed for other reasons. 2 of were successful ARM builds. 
   I started again the builds on this PR. If the builds do not fail for SIGABRT (i.e. `pure virtual method called`), I am pretty confident that we can merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626483644



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Comments in the RocksDB code state:
   
   ```
   // In Level: Non-bottom-level files older than TTL will go through the
   //           compaction process.
   // In FIFO: Files older than TTL will be deleted.
   ```
   
   So FIFO compaction is not a pre-requisite.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626486541



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Testing does not harm, but I would avoid to test too many different combinations because this seems to have the potential to become a maintenance hell. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832644717


   I increased the RocksDB version since I found a more recent version on maven central.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
codefactor commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892269776


   @ableegoldman ,
   Thanks a lot - do you have any links where I can find out the following:
   1. are there any breaking changes if I upgrade from version 2.6 to 3.0?
   2. when is version 3.0 scheduled to be released to maven central repository?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892279085


   1) Possibly -- you can check out the [Streams upgrade guide](https://kafka.apache.org/28/documentation/streams/upgrade-guide), which describes any public API changes in each version. The section for 3.0 won't appear until it's released, but you can always poke around the [source html for 3.0 changes](https://github.com/apache/kafka/blob/4eb72add11b548e3fe8fea72856af49dc950e444/docs/streams/upgrade-guide.html#L97) if you want to be prepared.
   
   2) Good question -- depends on how quickly the remaining open items can be addressed and if there are any new issues found during the two week testing period. The [3.0 release plan](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177046466) lists the unresolved issues targeted for 3.0, and you can subscribe to the mailing list if you want to keep up with the release progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: [DO NOT MERGE] KAFKA-8897: Upgrade RocksDB

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626032529



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       We have two data structures that contain state stores but we check only one when we close the state stores. In the error case it can happen that one contains an open store that the other data structure does not contain. This is a quick fix for it, but we should consider whether it is possible to consolidate these two data structures.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833915222


   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626990213



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       Yep, in RocksDBStore we set `wOptions.setDisableWAL(true)` -- it's not on the DB/CF options, instead it's a configuration on a separate WriteOptions class. So we do indeed enforce that it's disabled
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626989130



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       I thought we did actively disable it, although I'll see if I can find where/whether this is done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626506480



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -99,35 +104,7 @@ public Env getEnv() {
 
     @Override
     public Options prepareForBulkLoad() {
-        /* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
-         *
-         * Q: What's the fastest way to load data into RocksDB?
-         *
-         * A: A fast way to direct insert data to the DB:
-         *
-         *  1. using single writer thread and insert in sorted order
-         *  2. batch hundreds of keys into one write batch
-         *  3. use vector memtable
-         *  4. make sure options.max_background_flushes is at least 4
-         *  5. before inserting the data,
-         *       disable automatic compaction,
-         *       set options.level0_file_num_compaction_trigger,
-         *           options.level0_slowdown_writes_trigger
-         *           and options.level0_stop_writes_trigger to very large.
-         *     After inserting all the data, issue a manual compaction.
-         *
-         * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option
-         */
-        // (1) not in our control
-        // (2) is done via bulk-loading API
-        // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-        //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-        // (4-5) below:
-        dbOptions.setMaxBackgroundFlushes(4);
-        columnFamilyOptions.setDisableAutoCompactions(true);
-        columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-        columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-        columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
       TBH, I do not understand why we set the options here instead of simply calling super.prepareForBulkLoad(). Also in the version that we currently use (5.18.4) the options are set like here.
   
   What value would it have to leave a comment with the RocksDB version? I guess, we will not downgrade RocksDB less than the version we currently use, right?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626511978



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception {
         assertFalse(globalStore.isOpen());
     }
 
-    @Test
-    public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
       Because test `shouldStopRunningWhenClosedByUser()` above is exactly the same test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
codefactor commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892207657


   Any plans that this will be released?
   
   It looks like this upgrade happens to include the following issue fix in rocksdb:
   https://github.com/facebook/rocksdb/issues/6703
   
   That fixes 2 CVEs:
   https://nvd.nist.gov/vuln/detail/CVE-2019-12900
   https://nvd.nist.gov/vuln/detail/CVE-2016-3189
   
   My application is blocked by security scans due to these, and we need a new release of kafka-streams to get those fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832643004


   OK, the other builds failed due to test failures, not due to SIGABRT. Thta is a good sign.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832627792


   I found leaks in the state store wrappers. 
   The ARM build was successful without any `pure virtual method called`. Let's see how the other builds end. 🤞 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r627228005



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       Good catch! All those option objects are driving me crazy! 🙂
   Yes, then we could log a warn and ignore the attribute or we could do nothing since it will not change anything. Throwing an exception seems a bit harsh to me.
   Whatever, we decide I would like to have that in a separate PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626176038



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Heh, @guozhangwang and I reviewed at the same time. I didn't notice that `globalStores` would not be populated until restoration, whereas `globalStateStores` is populated in the constructor. Imo we should just populate `globalStores` in the constructor as well, but I guess that won't be necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r629637264



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Fair enough. FWIW I did not mean to literally log a warning, I guess that word is quite overloaded. I just meant we should be wary of users trying to use it in this way, pretty much exactly what Guozhang said with
   
   > "so if someone asked "why setting TTL does not work in Streams rocksDB" we would refer them to read this first :)"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r629541422



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       I agree with @guozhangwang.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626971137



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       To clarify I was definitely not recommending to test arbitrary combinations or anything beyond just this one case where I thought there may be a conflict. Sounds like they just added a feature for ttl to work with non-FIFO compaction, which will actually make many KV users incredibly happy. We may want to leave a note on that one ticket about a ttl KeyValueStore that has had so many +1's from users 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833427130


   The system tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4495/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626508070



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       I quickly tested it by setting the compaction style to universal and setting the ttl and could not see any exception. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626171667



##########
File path: gradle/dependencies.gradle
##########
@@ -103,7 +103,7 @@ versions += [
   netty: "4.1.62.Final",
   powermock: "2.0.9",
   reflections: "0.9.12",
-  rocksDB: "5.18.4",
+  rocksDB: "6.16.4",

Review comment:
       Guess I celebrated too early...hope you find the remaining leak without much trouble (and that it is indeed a leak and not an unavoidable bug of some kind, though that seems very unlikely)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-833427130


   The system tests job: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4497/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r628815666



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Oh, I guess I didn't read it very carefully 😅  In that case we may want to warn users that this option will not, in fact, provide "real" ttl (unless also using FIFO compaction). I'd imagine users may see this new option pop up and assume, as I did, that it meant deleting files older than ttl, ie the definition of ttl...
   
   Also, not totally sure I understand -- does that mean _only_ files older than the ttl will be compacted, or just that if a file remains uncompacted for longer than the ttl it will be "prematurely" compacted? I assume the latter...
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626735704



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception {
         assertFalse(globalStore.isOpen());
     }
 
-    @Test
-    public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
       Thx!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626497390



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {
+        dbOptions.setAvoidUnnecessaryBlockingIO(avoidUnnecessaryBlockingIO);
+        return this;
+    }
+
+    @Override
+    public boolean avoidUnnecessaryBlockingIO() {
+        return dbOptions.avoidUnnecessaryBlockingIO();
+    }
+
+    @Override
+    public Options setPersistStatsToDisk(final boolean persistStatsToDisk) {
+        dbOptions.setPersistStatsToDisk(persistStatsToDisk);
+        return this;
+    }
+
+    @Override
+    public boolean persistStatsToDisk() {
+        return dbOptions.persistStatsToDisk();
+    }
+
+    @Override
+    public Options setWriteDbidToManifest(final boolean writeDbidToManifest) {
+        dbOptions.setWriteDbidToManifest(writeDbidToManifest);
+        return this;
+    }
+
+    @Override
+    public boolean writeDbidToManifest() {
+        return dbOptions.writeDbidToManifest();
+    }
+
+    @Override
+    public Options setLogReadaheadSize(final long logReadaheadSize) {
+        dbOptions.setLogReadaheadSize(logReadaheadSize);
+        return this;
+    }
+
+    @Override
+    public long logReadaheadSize() {
+        return dbOptions.logReadaheadSize();
+    }
+
+    @Override
+    public Options setBestEffortsRecovery(final boolean bestEffortsRecovery) {
+        dbOptions.setBestEffortsRecovery(bestEffortsRecovery);
+        return this;
+    }
+
+    @Override
+    public boolean bestEffortsRecovery() {
+        return dbOptions.bestEffortsRecovery();
+    }
+
+    @Override
+    public Options setMaxBgErrorResumeCount(final int maxBgerrorResumeCount) {
+        dbOptions.setMaxBgErrorResumeCount(maxBgerrorResumeCount);
+        return this;
+    }
+
+    @Override
+    public int maxBgerrorResumeCount() {
+        return dbOptions.maxBgerrorResumeCount();
+    }
+
+    @Override
+    public Options setBgerrorResumeRetryInterval(final long bgerrorResumeRetryInterval) {
+        dbOptions.setBgerrorResumeRetryInterval(bgerrorResumeRetryInterval);
+        return this;
+    }
+
+    @Override
+    public long bgerrorResumeRetryInterval() {
+        return dbOptions.bgerrorResumeRetryInterval();
+    }
+
+    @Override
+    public Options setSstPartitionerFactory(final SstPartitionerFactory sstPartitionerFactory) {
+        columnFamilyOptions.setSstPartitionerFactory(sstPartitionerFactory);
+        return this;
+    }
+
+    @Override
+    public SstPartitionerFactory sstPartitionerFactory() {
+        return columnFamilyOptions.sstPartitionerFactory();
+    }
+
+    @Override
+    public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {

Review comment:
       I am not sure either, but the following comment suggests that it limits the number of outstanding compactions. So I guess it is about the queue of compactions that are planned and not the number of compactions that are currently performed. But I might be wrong. If I am correct, then the name is a bit misleading. 
   
   ```
     // Compaction concurrent thread limiter for the column family.
     // If non-nullptr, use given concurrent thread limiter to control
     // the max outstanding compaction tasks. Limiter can be shared with
     // multiple column families across db instances.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org