You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/29 17:48:38 UTC

[GitHub] [kafka] cadonna opened a new pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

cadonna opened a new pull request #9098:
URL: https://github.com/apache/kafka/pull/9098


   This PR refactors the RocksDB store and the metrics infrastructure in Streams 
   in preparation of the recordings of the RocksDB properties specified in KIP-607.
   
   The refactoring includes:
   - wrapper around `BlockedBasedTableConfig` to make the cache accessible to the 
      RocksDB metrics recorder
   - RocksDB metrics recorder now takes also the DB instance and the cache in addition
      to the statistics
   - The value providers for the metrics are added to the RockDB metrics recorder also if
      the recording level is INFO.
   - The creation of the RocksDB metrics recording trigger is moved to `StreamsMetricsImpl`   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673100824


   @cadonna : 
   
   ```
   > Task :streams:checkstyleMain
   [ant:checkstyle] [ERROR] /home/confluent/kafka/streams/src/generated/java/org/apache/kafka/streams/InteractiveQueryPartitionResponse.java:1: Line does not match expected header line of '/*'. [Header]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei removed a comment on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei removed a comment on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673100824


   @cadonna : 
   
   ```
   > Task :streams:checkstyleMain
   [ant:checkstyle] [ERROR] /home/confluent/kafka/streams/src/generated/java/org/apache/kafka/streams/InteractiveQueryPartitionResponse.java:1: Line does not match expected header line of '/*'. [Header]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+        final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
         }

Review comment:
       Ah, after reading your test, I now see the issue. I'd overlooked the fact that users would independently construct the table config object AND the cache. I see now that this makes it impossible to reliably capture the cache, since users have to actually choose to pass our special table config to the Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection instead?
   
   ```suggestion
           if (tableFormatConfig instanceof BlockBasedTableConfig) {
               final BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
               try {
                   final Field blockCacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_");
                   blockCacheField.setAccessible(true);
                   final Cache nullableBlockCache = (Cache) blockCacheField.get(blockBasedTableConfig);
                   metricsRecorder.addValueProviders(name, db, nullableBlockCache, statistics);
               } catch (final NoSuchFieldException | IllegalAccessException | ClassCastException e) {
                   log.warn("Expected to find and access field 'blockCache_' in BlockBasedTableConfig. " +
                                "Probably, an incompatible version of RocksDB is being used. " +
                                "Cache will be missing from memory metrics.", e);
               }
           } else {
               metricsRecorder.addValueProviders(name, db, null, statistics);
           }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We can also add a test that searches the classpath for implementations of TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final
         this.metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM

Review comment:
       Interesting... Should we add a `Time` argument to the constructor? It would be a minor amendment to the KIP.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468775002



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       Could this ever be null actually? I think even in unit tests the DBOptions would contain stats?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673329185


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468891122



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       I was thinking that, if we know `maybeSetUpStatistics` is only called once in lifetime of the store then here we could check that `userSpecifiedOptions.statistics() == null` and otherwise throw illegal-state exception. And then in `addValueProvidersToMetricsRecorder` we check that ``userSpecifiedOptions.statistics() != null` and otherwise throw illegal-state exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672975857






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r469327500



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -187,37 +206,39 @@ public void record(final long now) {
         long bytesReadDuringCompaction = 0;
         long numberOfOpenFiles = 0;
         long numberOfFileErrors = 0;
-        for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) {
-            bytesWrittenToDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_WRITTEN);
-            bytesReadFromDatabase += valueProviders.statistics.getAndResetTickerCount(TickerType.BYTES_READ);
-            memtableBytesFlushed += valueProviders.statistics.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES);
-            memtableHits += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
-            memtableMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
-            blockCacheDataHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT);
-            blockCacheDataMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS);
-            blockCacheIndexHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT);
-            blockCacheIndexMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS);
-            blockCacheFilterHits += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT);
-            blockCacheFilterMisses += valueProviders.statistics.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS);
-            writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
-            bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
-            bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
-            numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
-                - valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
-            numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
+        if (storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics != null)) {

Review comment:
       I think it's implied in `addValueProviders` that if any of the valueProviders' statistics are non-null, then they are all non-null, in which case, this makes sense as a guard. Still, it's kind of subtle.
   
   Why not just put a guard inside the loop instead to `continue` or `break` if it turns out that `valueProviders.statistics == null`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463578816



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final
         this.metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM

Review comment:
       I would prefer to postpone that, because currently it is not strictly needed and the time is only used in the RocksDB recording trigger that records only internal RocksDB metrics. I do not see how exposing time would be useful for users during testing. If anybody complains, we can still do it in a future KIP.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468802841



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -204,17 +204,17 @@ private void maybeSetUpStatistics(final Map<String, Object> configs) {
         }
     }
 
-    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+    private void addValueProvidersToMetricsRecorder() {
         final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
         final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
-        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
-            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
-            metricsRecorder.addValueProviders(name, db, cache, statistics);
-        } else {
-            metricsRecorder.addValueProviders(name, db, null, statistics);
-            log.warn("A table format configuration is used that does not expose the block cache. This means " +
-                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
-        }
+        if (!(tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache)) {

Review comment:
       No, it was not intentional. I simply did not think hard enough.  
   I agree with you that we should also allow `PlainTableConfig`. If users use the PlainTable format they should already expect that block cache specific metrics do not show up or report constant zero. 
   Thank you for pointing this out!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672975608


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+        final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
         }

Review comment:
       I agree with you that it is not ideal and thank you for this lesson on reflection. 
   
   Indeed, I do not like reflection in this case, because it makes the code too much dependent on RocksDB internals. We should use reflection to check if the public API to configure RocksDB changed in a newer version, but that is another story.
   
   I do not understand how the alternative of wrapping `BlockBasedTableConfig` into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is not accessible in `BlockBasedTableConfig` it will also not be accessible when it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name). We need to get the reference to the cache when the cache is set in `BlockBasedTableConfig`. If the cache is already set we can only use reflection.
   
   Since the block based table format is the only format in RocksDB that uses the cache, I do not see why a user absolutely needs to pass a new `BlockBasedTableConfig` object. I think for now it is OK to log a warning, and clearly document that the provided `BlockBasedTableConfig` object should be used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r469422110



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       Actually, Streams would provide the built-in metrics, but not update them. I also thought about not providing them at all, but that is a bit tricky and I did not want spent too much time on such a rare case. I will open a Jira and defer this to later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+        final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
         }

Review comment:
       Ah, after reading your test, I now see the issue. I'd overlooked the fact that users would independently construct the table config object AND the cache. I see now that this makes it impossible to reliably capture the cache, since users have to actually choose to pass our special table config to the Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection instead?
   
   ```suggestion
           if (tableFormatConfig instanceof BlockBasedTableConfig) {
               final BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
               try {
                   final Field blockCacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_");
                   blockCacheField.setAccessible(true);
                   final Cache nullableBlockCache = (Cache) blockCacheField.get(blockBasedTableConfig);
                   metricsRecorder.addValueProviders(name, db, nullableBlockCache, statistics);
               } catch (final NoSuchFieldException | IllegalAccessException | ClassCastException e) {
                   log.warn("Expected to find and access field 'blockCache_' in BlockBasedTableConfig. " +
                                "Probably, an incompatible version of RocksDB is being used. " +
                                "Cache will be missing from memory metrics.", e);
                   metricsRecorder.addValueProviders(name, db, null, statistics);
               }
           } else {
               metricsRecorder.addValueProviders(name, db, null, statistics);
           }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We can also add a test that searches the classpath for implementations of TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672977276


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090192


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468713975



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -204,17 +204,17 @@ private void maybeSetUpStatistics(final Map<String, Object> configs) {
         }
     }
 
-    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+    private void addValueProvidersToMetricsRecorder() {
         final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
         final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
-        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
-            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
-            metricsRecorder.addValueProviders(name, db, cache, statistics);
-        } else {
-            metricsRecorder.addValueProviders(name, db, null, statistics);
-            log.warn("A table format configuration is used that does not expose the block cache. This means " +
-                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
-        }
+        if (!(tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache)) {

Review comment:
       It seems like we would now be forbidding the use of `PlainTableConfig`. Is that intentional?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672975770


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673493440


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9098:
URL: https://github.com/apache/kafka/pull/9098


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673328944


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r469388654



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       Hmm, that means if a user calls `setStatistics` externally then streams itself would not provide the built-in metrics, is that right? I was not aware of this, and if it is by-design then that's fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-665616943


   Call for review: @vvcephei @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673582389


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673661633


   Looks like the tests never actually ran. I'm running them locally now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468891122



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       I was thinking that, if we know `maybeSetUpStatistics` is only called once in lifetime of the store then here we could check that `userSpecifiedOptions.statistics() == null` and otherwise throw illegal-state exception. And then in `addValueProvidersToMetricsRecorder` we check that ``userSpecifiedOptions.statistics() != null` and otherwise throw illegal-state exception. This is to make the call-trace assumptions more specific.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673659166


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673582467


   \o/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r468796333



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       The statistics are `null` when it is not set in the options. If it were not `null` unit test `RocksDBTest#shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRecordingLevelDebug()` would fail. I also verified with a debug run. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090023


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672912500


   Ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r469095757



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,26 +181,42 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {

Review comment:
       I see what you are aiming at. However, why should we throw an `IllegalStateException` when `userSpecifiedOptions.statistics() != null`? The user could provide a statistics object through the config setter and use it to read the statistics externally to our metric framework. Such a pattern was supported before and was not changed in this PR or the PR that introduced the statistics-based RocksDB metrics.
   Anyways, it is good that you mentioned this, because I checked again the code in the metrics recorder and a `null` check was missing. Plus I included some checks to ensure that either all statistics of a metrics recorder are`null` or all statistics of a metrics recorder are NOT `null`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673672230


   Build passed:
   ```
   $ test-kafka :streams:testAll
   ...
   BUILD SUCCESSFUL in 9m 57s
   94 actionable tasks: 94 executed
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090128


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-672977429


   Well, I guess I'll just run them locally.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org