You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/05 15:39:11 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory

cadonna commented on code in PR #12935:
URL: https://github.com/apache/kafka/pull/12935#discussion_r1039754929


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -526,6 +521,9 @@ public synchronized void close() {
         fOptions.close();
         filter.close();
         cache.close();
+        if (statistics != null) {
+            statistics.close();
+        }
 

Review Comment:
   What happens if a user-specified statistics object is closed by user code?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -229,40 +230,34 @@ void openDB(final Map<String, Object> configs, final File stateDir) {
 
         // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpStatistics(configs);
-
+        statistics = userSpecifiedOptions.statistics();
+        if (statistics == null) {
+            if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+                statistics = new Statistics();
+                dbOptions.setStatistics(statistics);
+            }
+            userSpecifiedStatistics = false;
+        } else {
+            userSpecifiedStatistics = true;
+        }

Review Comment:
   Could you extract this to a method? IMO, `openDB()` is already quite long.  



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java:
##########
@@ -346,20 +346,6 @@ public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty() {
         verifyNoMoreInteractions(recordingTrigger);
     }
 
-    @Test
-    public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
-        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
-        recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-        verify(statisticsToAdd1).close();
-    }
-
-    @Test
-    public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
-        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
-        recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-        verify(statisticsToAdd1, never()).close();
-    }

Review Comment:
   Is it possible to write some tests for `RocksDBStore` to ensure the statistics object is closed and the same instance is passed to the metrics recorder?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org