You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/08 02:26:24 UTC
[kafka] branch trunk updated: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36a2f7bfd08 KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
36a2f7bfd08 is described below
commit 36a2f7bfd0827158b085a6a12b61cfc1a54b4c1a
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Thu Dec 8 03:25:58 2022 +0100
KAFKA-14432: RocksDBStore relies on finalizers to not leak memory (#12935)
RocksDBStore relied on finalizers to not leak memory (and leaked memory after the upgrade to RocksDB 7).
The problem was that every call to options.statistics creates a new wrapper object that needs to be finalized.
I simplified the logic a bit and moved the ownership of the statistics from ValueProvider to RocksDBStore.
Reviewers: Bruno Cadonna <ca...@apache.org>, Anna Sophie Blee-Goldman <ab...@apache.org>, Christo Lolov <lo...@amazon.com>
---
.../streams/state/internals/RocksDBStore.java | 31 ++++++-----
.../internals/metrics/RocksDBMetricsRecorder.java | 7 ---
.../streams/state/internals/RocksDBStoreTest.java | 62 ++++++++++++++++++++--
.../metrics/RocksDBMetricsRecorderTest.java | 14 -----
4 files changed, 76 insertions(+), 38 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 3b0ac605f75..edba07acb8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -108,6 +108,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
FlushOptions fOptions;
private Cache cache;
private BloomFilter filter;
+ private Statistics statistics;
private RocksDBConfigSetter configSetter;
private boolean userSpecifiedStatistics = false;
@@ -229,40 +230,38 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// Setup statistics before the database is opened, otherwise the statistics are not updated
// with the measurements from Rocks DB
- maybeSetUpStatistics(configs);
-
+ setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
open = true;
addValueProvidersToMetricsRecorder();
}
- private void maybeSetUpStatistics(final Map<String, Object> configs) {
- if (userSpecifiedOptions.statistics() != null) {
+ private void setupStatistics(final Map<String, Object> configs, final DBOptions dbOptions) {
+ statistics = userSpecifiedOptions.statistics();
+ if (statistics == null) {
+ if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
+ statistics = new Statistics();
+ dbOptions.setStatistics(statistics);
+ }
+ userSpecifiedStatistics = false;
+ } else {
userSpecifiedStatistics = true;
}
- if (!userSpecifiedStatistics &&
- RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
-
- // metrics recorder will clean up statistics object
- final Statistics statistics = new Statistics();
- userSpecifiedOptions.setStatistics(statistics);
- }
}
private void addValueProvidersToMetricsRecorder() {
final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig();
- final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics();
if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache();
- metricsRecorder.addValueProviders(name, db, cache, statistics);
+ metricsRecorder.addValueProviders(name, db, cache, userSpecifiedStatistics ? null : statistics);
} else if (tableFormatConfig instanceof BlockBasedTableConfig) {
throw new ProcessorStateException("The used block-based table format configuration does not expose the " +
"block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " +
"the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " +
"the RocksDB options.");
} else {
- metricsRecorder.addValueProviders(name, db, null, statistics);
+ metricsRecorder.addValueProviders(name, db, null, userSpecifiedStatistics ? null : statistics);
}
}
@@ -526,6 +525,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
fOptions.close();
filter.close();
cache.close();
+ if (statistics != null) {
+ statistics.close();
+ }
dbAccessor = null;
userSpecifiedOptions = null;
@@ -534,6 +536,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
db = null;
filter = null;
cache = null;
+ statistics = null;
}
private void closeOpenIterators() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
index 2b2580b135a..df68f2e8024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
@@ -79,12 +79,6 @@ public class RocksDBMetricsRecorder {
}
this.statistics = statistics;
}
-
- public void maybeCloseStatistics() {
- if (statistics != null) {
- statistics.close();
- }
- }
}
private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
@@ -411,7 +405,6 @@ public class RocksDBMetricsRecorder {
" could be found. This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
- removedValueProviders.maybeCloseStatistics();
if (storeToValueProviders.isEmpty()) {
logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger",
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 5622c85be0a..00b08d68ca1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.lang.reflect.Field;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -230,12 +231,18 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
public RocksDBConfigSetterWithUserProvidedStatistics(){}
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
- options.setStatistics(new Statistics());
+ lastStatistics = new Statistics();
+ options.setStatistics(lastStatistics);
}
public void close(final String storeName, final Options options) {
- options.statistics().close();
+ // We want to be in charge of closing our statistics ourselves.
+ assertTrue(lastStatistics.isOwningHandle());
+ lastStatistics.close();
+ lastStatistics = null;
}
+
+ protected static Statistics lastStatistics = null;
}
@Test
@@ -248,6 +255,49 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull());
}
+
+ @Test
+ public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
+
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+ final Statistics userStatistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
+ final Statistics statisticsHandle = getStatistics(rocksDBStore);
+ rocksDBStore.close();
+
+ // Both statistics handles must be closed now.
+ assertFalse(userStatistics.isOwningHandle());
+ assertFalse(statisticsHandle.isOwningHandle());
+ assertNull(getStatistics(rocksDBStore));
+ assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
+ }
+
+ @Test
+ public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() throws Exception {
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG);
+
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+
+ verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), eq(getStatistics(rocksDBStore)));
+ }
+
+ @Test
+ public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
+ rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
+ context = getProcessorContext(RecordingLevel.DEBUG);
+
+ rocksDBStore.openDB(context.appConfigs(), context.stateDir());
+ final Statistics statisticsHandle = getStatistics(rocksDBStore);
+ rocksDBStore.close();
+
+ // Statistics handles must be closed now.
+ assertFalse(statisticsHandle.isOwningHandle());
+ assertNull(getStatistics(rocksDBStore));
+ }
+
+
public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
@@ -1165,5 +1215,11 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
return result;
}
-
+ private Statistics getStatistics(final RocksDBStore rocksDBStore) throws Exception {
+ final Field statisticsField = rocksDBStore.getClass().getDeclaredField("statistics");
+ statisticsField.setAccessible(true);
+ final Statistics statistics = (Statistics) statisticsField.get(rocksDBStore);
+ statisticsField.setAccessible(false);
+ return statistics;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
index 4ccfa2dfdcc..d1cb09157d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
@@ -346,20 +346,6 @@ public class RocksDBMetricsRecorderTest {
verifyNoMoreInteractions(recordingTrigger);
}
- @Test
- public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
- recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);
- recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
- verify(statisticsToAdd1).close();
- }
-
- @Test
- public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
- recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null);
- recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
- verify(statisticsToAdd1, never()).close();
- }
-
@Test
public void shouldNotRemoveItselfFromRecordingTriggerWhenAtLeastOneValueProviderIsPresent() {
recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1);