You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/09 22:45:37 UTC
[kafka] branch trunk updated: KAFKA-8324: Add close() method to
RocksDBConfigSetter (#6697)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2826c6 KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)
b2826c6 is described below
commit b2826c6c2bfc3360913e63e0b65f9d79e782fd50
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu May 9 15:45:22 2019 -0700
KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)
Following KIP-453, this PR adds a default close() method to the RocksDBConfigSetter interface and calls it when closing a store.
Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>, John Roesler <jo...@confluent.io>, Bruno Cadonna <br...@confluent.io>
---
.../kafka/streams/state/RocksDBConfigSetter.java | 24 +++++++++++++++++++++-
.../streams/state/internals/RocksDBStore.java | 10 ++++++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
index ec7ac8f..613d636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
@@ -20,6 +20,9 @@ import org.rocksdb.Options;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* An interface to that allows developers to customize the RocksDB settings for a given Store.
* Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
@@ -29,12 +32,31 @@ import java.util.Map;
*/
public interface RocksDBConfigSetter {
+ Logger LOG = LoggerFactory.getLogger(RocksDBConfigSetter.class);
+
/**
* Set the rocks db options for the provided storeName.
*
* @param storeName the name of the store being configured
- * @param options the Rocks DB options
+ * @param options the RocksDB options
* @param configs the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
*/
void setConfig(final String storeName, final Options options, final Map<String, Object> configs);
+
+ /**
+ * Close any user-constructed objects that inherit from {@code org.rocksdb.RocksObject}.
+ * <p>
+ * Any object created with {@code new} in {@link RocksDBConfigSetter#setConfig setConfig()} and that inherits
+ * from {@code org.rocksdb.RocksObject} should have {@code org.rocksdb.RocksObject#close()}
+ * called on it here to avoid leaking off-heap memory. Objects to be closed can be saved by the user or retrieved
+ * back from {@code options} using its getter methods.
+ * <p>
+ * Example objects needing to be closed include {@code org.rocksdb.Filter} and {@code org.rocksdb.Cache}.
+ *
+ * @param storeName the name of the store being configured
+ * @param options the RocksDB options
+ */
+ default void close(final String storeName, final Options options) {
+ LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter");
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e4c416c..ed74468 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -92,6 +92,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
FlushOptions fOptions;
private BloomFilter filter;
+ private RocksDBConfigSetter configSetter;
+
private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
// visible for testing
@@ -154,7 +156,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
(Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
if (configSetterClass != null) {
- final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
+ configSetter = Utils.newInstance(configSetterClass);
configSetter.setConfig(name, userSpecifiedOptions, configs);
}
@@ -390,6 +392,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
open = false;
closeOpenIterators();
+
+ if (configSetter != null) {
+ configSetter.close(name, userSpecifiedOptions);
+ configSetter = null;
+ }
+
dbAccessor.close();
userSpecifiedOptions.close();
wOptions.close();