You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/30 16:48:24 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics are not updated
+        // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) {
+        final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if the block cache is shared.");
         }

Review comment:
       Ah, after reading your test, I now see the issue. I'd overlooked the fact that users would independently construct the table config object AND the cache. I see now that this makes it impossible to reliably capture the cache, since users have to actually choose to pass our special table config to the Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection instead?
   
   ```suggestion
           if (tableFormatConfig instanceof BlockBasedTableConfig) {
               final BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
               try {
                   final Field blockCacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_");
                   blockCacheField.setAccessible(true);
                   final Cache nullableBlockCache = (Cache) blockCacheField.get(blockBasedTableConfig);
                   metricsRecorder.addValueProviders(name, db, nullableBlockCache, statistics);
               } catch (final NoSuchFieldException | IllegalAccessException | ClassCastException e) {
                   log.warn("Expected to find and access field 'blockCache_' in BlockBasedTableConfig. " +
                                "Probably, an incompatible version of RocksDB is being used. " +
                                "Cache will be missing from memory metrics.", e);
               }
           } else {
               metricsRecorder.addValueProviders(name, db, null, statistics);
           }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We can also add a test that searches the classpath for implementations of TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final
         this.metrics = new StreamsMetricsImpl(
             new Metrics(metricConfig),
             threadId,
-            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM

Review comment:
       Interesting... Should we add a `Time` argument to the constructor? It would be a minor amendment to the KIP.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org