You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/04 23:24:10 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626056309



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##########
@@ -411,13 +408,25 @@ private StateConsumer initialize() {
 
             return stateConsumer;
         } catch (final StreamsException fatalException) {
+            closeStateConsumer(stateConsumer, false);

Review comment:
       Ah, so we were also possibly leaking stores in case of a StreamsException or other Exception? Yikes

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Yikes, so it actually was a (possible) leak in production code? I'm almost glad rocksdb had a bug, otherwise we might have never caught it 😅 
   
   Definite +1 on consolidating these, can you file a quick ticket for this? It almost sounds like a bug in itself that one might contain stores that the other does not, but I suppose as long as they all get closed it's probably ok. 
   
   I'd bet you could find a community member to pick it up by labeling as `newbie` or `newbie++`

##########
File path: gradle/dependencies.gradle
##########
@@ -103,7 +103,7 @@ versions += [
   netty: "4.1.62.Final",
   powermock: "2.0.9",
   reflections: "0.9.12",
-  rocksDB: "5.18.4",
+  rocksDB: "6.16.4",

Review comment:
       🥳 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -374,7 +374,7 @@ public void flush() {
 
     @Override
     public void close() {
-        if (globalStores.isEmpty()) {
+        if (globalStateStores.isEmpty() && globalStores.isEmpty()) {

Review comment:
       What even is the difference between `globalStateStores` and `globalStores`? I'm not surprised we had a leak, I'm pretty sure if I was looking at this code I wouldn't remember that these were actually two different variables. Maybe we can give them slightly more descriptive names in the meantime?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -99,35 +104,7 @@ public Env getEnv() {
 
     @Override
     public Options prepareForBulkLoad() {
-        /* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
-         *
-         * Q: What's the fastest way to load data into RocksDB?
-         *
-         * A: A fast way to direct insert data to the DB:
-         *
-         *  1. using single writer thread and insert in sorted order
-         *  2. batch hundreds of keys into one write batch
-         *  3. use vector memtable
-         *  4. make sure options.max_background_flushes is at least 4
-         *  5. before inserting the data,
-         *       disable automatic compaction,
-         *       set options.level0_file_num_compaction_trigger,
-         *           options.level0_slowdown_writes_trigger
-         *           and options.level0_stop_writes_trigger to very large.
-         *     After inserting all the data, issue a manual compaction.
-         *
-         * 3-5 will be automatically done if you call Options::PrepareForBulkLoad() to your option
-         */
-        // (1) not in our control
-        // (2) is done via bulk-loading API
-        // (3) skipping because, not done in actual PrepareForBulkLoad() code in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-        //columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-        // (4-5) below:
-        dbOptions.setMaxBackgroundFlushes(4);
-        columnFamilyOptions.setDisableAutoCompactions(true);
-        columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-        columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-        columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
       I seem to remember there was some reason we didn't remove this when we disabled bulk loading...but I don't remember what that was. @guozhangwang do you?
   
   Or were these options just removed in 6.x

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {

Review comment:
       > Use this if your DB is very small (like under 1GB) and you don't want to spend lots of memory for memtables.
   
   This looks interesting 🤔  Could be very useful for some users like ksqlDB, where many applications have small state stores cc @rodesai 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);

Review comment:
       Should we disable/throw/ignore this, since we disable the WAL? Same for the other `wal` options

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {
+        dbOptions.setAvoidUnnecessaryBlockingIO(avoidUnnecessaryBlockingIO);
+        return this;
+    }
+
+    @Override
+    public boolean avoidUnnecessaryBlockingIO() {
+        return dbOptions.avoidUnnecessaryBlockingIO();
+    }
+
+    @Override
+    public Options setPersistStatsToDisk(final boolean persistStatsToDisk) {
+        dbOptions.setPersistStatsToDisk(persistStatsToDisk);
+        return this;
+    }
+
+    @Override
+    public boolean persistStatsToDisk() {
+        return dbOptions.persistStatsToDisk();
+    }
+
+    @Override
+    public Options setWriteDbidToManifest(final boolean writeDbidToManifest) {
+        dbOptions.setWriteDbidToManifest(writeDbidToManifest);
+        return this;
+    }
+
+    @Override
+    public boolean writeDbidToManifest() {
+        return dbOptions.writeDbidToManifest();
+    }
+
+    @Override
+    public Options setLogReadaheadSize(final long logReadaheadSize) {
+        dbOptions.setLogReadaheadSize(logReadaheadSize);
+        return this;
+    }
+
+    @Override
+    public long logReadaheadSize() {
+        return dbOptions.logReadaheadSize();
+    }
+
+    @Override
+    public Options setBestEffortsRecovery(final boolean bestEffortsRecovery) {
+        dbOptions.setBestEffortsRecovery(bestEffortsRecovery);
+        return this;
+    }
+
+    @Override
+    public boolean bestEffortsRecovery() {
+        return dbOptions.bestEffortsRecovery();
+    }
+
+    @Override
+    public Options setMaxBgErrorResumeCount(final int maxBgerrorResumeCount) {
+        dbOptions.setMaxBgErrorResumeCount(maxBgerrorResumeCount);
+        return this;
+    }
+
+    @Override
+    public int maxBgerrorResumeCount() {
+        return dbOptions.maxBgerrorResumeCount();
+    }
+
+    @Override
+    public Options setBgerrorResumeRetryInterval(final long bgerrorResumeRetryInterval) {
+        dbOptions.setBgerrorResumeRetryInterval(bgerrorResumeRetryInterval);
+        return this;
+    }
+
+    @Override
+    public long bgerrorResumeRetryInterval() {
+        return dbOptions.bgerrorResumeRetryInterval();
+    }
+
+    @Override
+    public Options setSstPartitionerFactory(final SstPartitionerFactory sstPartitionerFactory) {
+        columnFamilyOptions.setSstPartitionerFactory(sstPartitionerFactory);
+        return this;
+    }
+
+    @Override
+    public SstPartitionerFactory sstPartitionerFactory() {
+        return columnFamilyOptions.sstPartitionerFactory();
+    }
+
+    @Override
+    public Options setCompactionThreadLimiter(final ConcurrentTaskLimiter compactionThreadLimiter) {

Review comment:
       Another interesting config, as my impression has been that background compactions are one of the biggest limiting factors in scaling up the state stores and/or partitions. But I'm not yet sure what the difference between this and just limiting the number of compaction threads is..

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);
+        return this;
+    }
+
+    @Override
+    public long ttl() {
+        return columnFamilyOptions.ttl();
+    }
+
+    @Override
+    public Options setAtomicFlush(final boolean atomicFlush) {
+        dbOptions.setAtomicFlush(atomicFlush);
+        return this;
+    }
+
+    @Override
+    public boolean atomicFlush() {
+        return dbOptions.atomicFlush();
+    }
+
+    @Override
+    public Options setAvoidUnnecessaryBlockingIO(final boolean avoidUnnecessaryBlockingIO) {

Review comment:
       this also seems quite interesting...possibly something we should enable by default? I filed [KAFKA-12748](https://issues.apache.org/jira/browse/KAFKA-12748) to explore some of these new options and run benchmarks to determine which should be configured by default

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##########
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
         return dbOptions.writeBufferManager();
     }
 
+    @Override
+    public Options setMaxWriteBatchGroupSizeBytes(final long maxWriteBatchGroupSizeBytes) {
+        dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+        return this;
+    }
+
+    @Override
+    public long maxWriteBatchGroupSizeBytes() {
+        return dbOptions.maxWriteBatchGroupSizeBytes();
+    }
+
+    @Override
+    public Options oldDefaults(final int majorVersion, final int minorVersion) {
+        columnFamilyOptions.oldDefaults(majorVersion, minorVersion);
+        return this;
+    }
+
+    @Override
+    public Options optimizeForSmallDb(final Cache cache) {
+        return super.optimizeForSmallDb(cache);
+    }
+
+    @Override
+    public AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter() {
+        return columnFamilyOptions.compactionFilter();
+    }
+
+    @Override
+    public AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory() {
+        return columnFamilyOptions.compactionFilterFactory();
+    }
+
+    @Override
+    public Options setStatsPersistPeriodSec(final int statsPersistPeriodSec) {
+        dbOptions.setStatsPersistPeriodSec(statsPersistPeriodSec);
+        return this;
+    }
+
+    @Override
+    public int statsPersistPeriodSec() {
+        return dbOptions.statsPersistPeriodSec();
+    }
+
+    @Override
+    public Options setStatsHistoryBufferSize(final long statsHistoryBufferSize) {
+        dbOptions.setStatsHistoryBufferSize(statsHistoryBufferSize);
+        return this;
+    }
+
+    @Override
+    public long statsHistoryBufferSize() {
+        return dbOptions.statsHistoryBufferSize();
+    }
+
+    @Override
+    public Options setStrictBytesPerSync(final boolean strictBytesPerSync) {
+        dbOptions.setStrictBytesPerSync(strictBytesPerSync);
+        return this;
+    }
+
+    @Override
+    public boolean strictBytesPerSync() {
+        return dbOptions.strictBytesPerSync();
+    }
+
+    @Override
+    public Options setListeners(final List<AbstractEventListener> listeners) {
+        dbOptions.setListeners(listeners);
+        return this;
+    }
+
+    @Override
+    public List<AbstractEventListener> listeners() {
+        return dbOptions.listeners();
+    }
+
+    @Override
+    public Options setEnablePipelinedWrite(final boolean enablePipelinedWrite) {
+        dbOptions.setEnablePipelinedWrite(enablePipelinedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean enablePipelinedWrite() {
+        return dbOptions.enablePipelinedWrite();
+    }
+
+    @Override
+    public Options setUnorderedWrite(final boolean unorderedWrite) {
+        dbOptions.setUnorderedWrite(unorderedWrite);
+        return this;
+    }
+
+    @Override
+    public boolean unorderedWrite() {
+        return dbOptions.unorderedWrite();
+    }
+
+    @Override
+    public Options setSkipCheckingSstFileSizesOnDbOpen(final boolean skipCheckingSstFileSizesOnDbOpen) {
+        dbOptions.setSkipCheckingSstFileSizesOnDbOpen(skipCheckingSstFileSizesOnDbOpen);
+        return this;
+    }
+
+    @Override
+    public boolean skipCheckingSstFileSizesOnDbOpen() {
+        return dbOptions.skipCheckingSstFileSizesOnDbOpen();
+    }
+
+    @Override
+    public Options setWalFilter(final AbstractWalFilter walFilter) {
+        dbOptions.setWalFilter(walFilter);
+        return this;
+    }
+
+    @Override
+    public WalFilter walFilter() {
+        return dbOptions.walFilter();
+    }
+
+    @Override
+    public Options setAllowIngestBehind(final boolean allowIngestBehind) {
+        dbOptions.setAllowIngestBehind(allowIngestBehind);
+        return this;
+    }
+
+    @Override
+    public boolean allowIngestBehind() {
+        return dbOptions.allowIngestBehind();
+    }
+
+    @Override
+    public Options setPreserveDeletes(final boolean preserveDeletes) {
+        dbOptions.setPreserveDeletes(preserveDeletes);
+        return this;
+    }
+
+    @Override
+    public boolean preserveDeletes() {
+        return dbOptions.preserveDeletes();
+    }
+
+    @Override
+    public Options setTwoWriteQueues(final boolean twoWriteQueues) {
+        dbOptions.setTwoWriteQueues(twoWriteQueues);
+        return this;
+    }
+
+    @Override
+    public boolean twoWriteQueues() {
+        return dbOptions.twoWriteQueues();
+    }
+
+    @Override
+    public Options setManualWalFlush(final boolean manualWalFlush) {
+        dbOptions.setManualWalFlush(manualWalFlush);
+        return this;
+    }
+
+    @Override
+    public boolean manualWalFlush() {
+        return dbOptions.manualWalFlush();
+    }
+
+    @Override
+    public Options setCfPaths(final Collection<DbPath> cfPaths) {
+        columnFamilyOptions.setCfPaths(cfPaths);
+        return this;
+    }
+
+    @Override
+    public List<DbPath> cfPaths() {
+        return columnFamilyOptions.cfPaths();
+    }
+
+    @Override
+    public Options setBottommostCompressionOptions(final CompressionOptions bottommostCompressionOptions) {
+        columnFamilyOptions.setBottommostCompressionOptions(bottommostCompressionOptions);
+        return this;
+    }
+
+    @Override
+    public CompressionOptions bottommostCompressionOptions() {
+        return columnFamilyOptions.bottommostCompressionOptions();
+    }
+
+    @Override
+    public Options setTtl(final long ttl) {
+        columnFamilyOptions.setTtl(ttl);

Review comment:
       Does this still require setting a specific compaction style (I think it was the FIFO style)? If so maybe we should enforce that in some way, or else at least log a warning that they are incompatible. Of course I'm not quite sure what would happen if you tried to set the ttl without the FIFO compaction, maybe RocksDB would throw an exception upon opening, or just silently ignore the ttl, or... who knows. Might be worth a quick test to find out

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Looking through the code in this class,  this feels a bit weird. Seems like the only way we could have (open) stores in `globalStateStores` but not in `globalStores` is when we hit an exception while trying to close the store in `globalStores`. I'm all for retrying in theory (though I'm willing to bet 99% of the time it's either permanent/fatal) but if that's what we intend to do, we should just retry it right after we hit that exception so it's clear what we're doing. Looping through this other structure like this feels very odd, as it implies these are distinct sets of StateStores instead of two data structures which should reflect the same set of underlying StateStores




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org