You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/08 02:26:24 UTC

[kafka] branch trunk updated: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36a2f7bfd08 KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
36a2f7bfd08 is described below

commit 36a2f7bfd0827158b085a6a12b61cfc1a54b4c1a
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Thu Dec 8 03:25:58 2022 +0100

    KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
    
    RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
    The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.
    
    I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.
    
    Reviewers: Bruno Cadonna <ca...@apache.org>, Anna Sophie Blee-Goldman <ab...@apache.org>, Christo Lolov <lo...@amazon.com>
---
 .../streams/state/internals/RocksDBStore.java      | 31 ++++++-----
 .../internals/metrics/RocksDBMetricsRecorder.java  |  7 ---
 .../streams/state/internals/RocksDBStoreTest.java  | 62 ++++++++++++++++++++--
 .../metrics/RocksDBMetricsRecorderTest.java        | 14 -----
 4 files changed, 76 insertions(+), 38 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3b0ac605f75..edba07acb8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -108,6 +108,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     FlushOptions fOptions;
     private Cache cache;
     private BloomFilter filter;
+    private Statistics statistics;
 
     private RocksDBConfigSetter configSetter;
     private boolean userSpecifiedStatistics = false;
@@ -229,40 +230,38 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
 
         // Setup statistics before the database is opened, otherwise the statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpStatistics(configs);
-
+        setupStatistics(configs, dbOptions);
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
 
         addValueProvidersToMetricsRecorder();
     }
 
-    private void maybeSetUpStatistics(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() != null) {
+    private void setupStatistics(final Map<String, Object> configs, final DBOptions dbOptions) {
+        statistics = userSpecifiedOptions.statistics();
+        if (statistics == null) {
+            if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+                statistics = new Statistics();
+                dbOptions.setStatistics(statistics);
+            }
+            userSpecifiedStatistics = false;
+        } else {
             userSpecifiedStatistics = true;
         }
-        if (!userSpecifiedStatistics &&
-                RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
-
-            // metrics recorder will clean up statistics object
-            final Statistics statistics = new Statistics();
-            userSpecifiedOptions.setStatistics(statistics);
-        }
     }
 
     private void addValueProvidersToMetricsRecorder() {
         final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
-        final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
         if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
             final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
-            metricsRecorder.addValueProviders(name, db, cache, statistics);
+            metricsRecorder.addValueProviders(name, db, cache, userSpecifiedStatistics ? null : statistics);
         } else if (tableFormatConfig instanceof BlockBasedTableConfig) {
             throw new ProcessorStateException("The used block-based table format configuration does not expose the " +
                     "block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " +
                     "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
                     "the RocksDB options.");
         } else {
-            metricsRecorder.addValueProviders(name, db, null, statistics);
+            metricsRecorder.addValueProviders(name, db, null, userSpecifiedStatistics ? null : statistics);
         }
     }
 
@@ -526,6 +525,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         fOptions.close();
         filter.close();
         cache.close();
+        if (statistics != null) {
+            statistics.close();
+        }
 
         dbAccessor = null;
         userSpecifiedOptions = null;
@@ -534,6 +536,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         db = null;
         filter = null;
         cache = null;
+        statistics = null;
     }
 
     private void closeOpenIterators() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 2b2580b135a..df68f2e8024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -79,12 +79,6 @@ public class RocksDBMetricsRecorder {
             }
             this.statistics = statistics;
         }
-
-        public void maybeCloseStatistics() {
-            if (statistics != null) {
-                statistics.close();
-            }
-        }
     }
 
     private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
@@ -411,7 +405,6 @@ public class RocksDBMetricsRecorder {
                 " could be found. This is a bug in Kafka Streams. " +
                 "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
         }
-        removedValueProviders.maybeCloseStatistics();
         if (storeToValueProviders.isEmpty()) {
             logger.debug(
                 "Removing metrics recorder for store {} of task {} from metrics recording trigger",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 5622c85be0a..00b08d68ca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.lang.reflect.Field;
 import java.util.Optional;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -230,12 +231,18 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         public RocksDBConfigSetterWithUserProvidedStatistics(){}
 
         public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
-            options.setStatistics(new Statistics());
+            lastStatistics = new Statistics();
+            options.setStatistics(lastStatistics);
         }
 
         public void close(final String storeName, final Options options) {
-            options.statistics().close();
+            // We want to be in charge of closing our statistics ourselves.
+            assertTrue(lastStatistics.isOwningHandle());
+            lastStatistics.close();
+            lastStatistics = null;
         }
+
+        protected static Statistics lastStatistics = null;
     }
 
     @Test
@@ -248,6 +255,49 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
     }
 
+
+    @Test
+    public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+        final Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
+        final Statistics statisticsHandle = getStatistics(rocksDBStore);
+        rocksDBStore.close();
+
+        // Both statistics handles must be closed now.
+        assertFalse(userStatistics.isOwningHandle());
+        assertFalse(statisticsHandle.isOwningHandle());
+        assertNull(getStatistics(rocksDBStore));
+        assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
+    }
+
+    @Test
+    public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() throws Exception {
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG);
+
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+
+        verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), eq(getStatistics(rocksDBStore)));
+    }
+
+    @Test
+    public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
+        rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+        context = getProcessorContext(RecordingLevel.DEBUG);
+
+        rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+        final Statistics statisticsHandle = getStatistics(rocksDBStore);
+        rocksDBStore.close();
+
+        // Statistics handles must be closed now.
+        assertFalse(statisticsHandle.isOwningHandle());
+        assertNull(getStatistics(rocksDBStore));
+    }
+
+
     public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
         public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
 
@@ -1165,5 +1215,11 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
         return result;
     }
 
-
+    private Statistics getStatistics(final RocksDBStore rocksDBStore) throws Exception {
+        final Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics");
+        statisticsField.setAccessible(true);
+        final Statistics statistics = (Statistics) statisticsField.get(rocksDBStore);
+        statisticsField.setAccessible(false);
+        return statistics;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index 4ccfa2dfdcc..d1cb09157d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -346,20 +346,6 @@ public class RocksDBMetricsRecorderTest {
         verifyNoMoreInteractions(recordingTrigger);
     }
 
-    @Test
-    public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
-        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
-        recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-        verify(statisticsToAdd1).close();
-    }
-
-    @Test
-    public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
-        recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
-        recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-        verify(statisticsToAdd1, never()).close();
-    }
-
     @Test
     public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);