You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/09 22:45:37 UTC

[kafka] branch trunk updated: KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2826c6  KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)
b2826c6 is described below

commit b2826c6c2bfc3360913e63e0b65f9d79e782fd50
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu May 9 15:45:22 2019 -0700

    KAFKA-8324: Add close() method to RocksDBConfigSetter (#6697)
    
    Following KIP-453, this PR adds a default close() method to the RocksDBConfigSetter interface and calls it when closing a store.
    
    Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>,  John Roesler <jo...@confluent.io>, Bruno Cadonna <br...@confluent.io>
---
 .../kafka/streams/state/RocksDBConfigSetter.java   | 24 +++++++++++++++++++++-
 .../streams/state/internals/RocksDBStore.java      | 10 ++++++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
index ec7ac8f..613d636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
@@ -20,6 +20,9 @@ import org.rocksdb.Options;
 
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An interface to that allows developers to customize the RocksDB settings for a given Store.
  * Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
@@ -29,12 +32,31 @@ import java.util.Map;
  */
 public interface RocksDBConfigSetter {
 
+    Logger LOG = LoggerFactory.getLogger(RocksDBConfigSetter.class);
+
     /**
      * Set the rocks db options for the provided storeName.
      * 
      * @param storeName     the name of the store being configured
-     * @param options       the Rocks DB options
+     * @param options       the RocksDB options
      * @param configs       the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
      */
     void setConfig(final String storeName, final Options options, final Map<String, Object> configs);
+
+    /**
+     * Close any user-constructed objects that inherit from {@code org.rocksdb.RocksObject}.
+     * <p>
+     * Any object created with {@code new} in {@link RocksDBConfigSetter#setConfig setConfig()} and that inherits
+     * from {@code org.rocksdb.RocksObject} should have {@code org.rocksdb.RocksObject#close()}
+     * called on it here to avoid leaking off-heap memory. Objects to be closed can be saved by the user or retrieved
+     * back from {@code options} using its getter methods.
+     * <p>
+     * Example objects needing to be closed include {@code org.rocksdb.Filter} and {@code org.rocksdb.Cache}.
+     *
+     * @param storeName     the name of the store being configured
+     * @param options       the RocksDB options
+     */
+    default void close(final String storeName, final Options options) {
+        LOG.warn("The default close will be removed in 3.0.0 -- you should overwrite it if you have implemented RocksDBConfigSetter");
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e4c416c..ed74468 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -92,6 +92,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
     FlushOptions fOptions;
     private BloomFilter filter;
 
+    private RocksDBConfigSetter configSetter;
+
     private volatile boolean prepareForBulkload = false;
     ProcessorContext internalProcessorContext;
     // visible for testing
@@ -154,7 +156,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
             (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
 
         if (configSetterClass != null) {
-            final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
+            configSetter = Utils.newInstance(configSetterClass);
             configSetter.setConfig(name, userSpecifiedOptions, configs);
         }
 
@@ -390,6 +392,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
 
         open = false;
         closeOpenIterators();
+
+        if (configSetter != null) {
+            configSetter.close(name, userSpecifiedOptions);
+            configSetter = null;
+        }
+
         dbAccessor.close();
         userSpecifiedOptions.close();
         wOptions.close();