You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/21 18:04:12 UTC

kafka git commit: KAFKA-3740: Enable configuration of RocksDBStores

Repository: kafka
Updated Branches:
  refs/heads/trunk 0e5700fb6 -> 999108667


KAFKA-3740: Enable configuration of RocksDBStores

Add new config StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG to enable advanced
RocksDB users to override default RocksDB configuration

Author: Damian Guy <da...@gmail.com>

Reviewers: Roger Hoover, Dan Norwood, Eno Thereska, Guozhang Wang

Closes #1640 from dguy/kafka-3740-listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99910866
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99910866
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99910866

Branch: refs/heads/trunk
Commit: 9991086671b0a74bc5fd5698ba81aaf9a2985404
Parents: 0e5700f
Author: Damian Guy <da...@gmail.com>
Authored: Thu Jul 21 11:04:08 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 21 11:04:08 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 11 +++++-
 .../streams/state/RocksDBConfigSetter.java      | 37 ++++++++++++++++++++
 .../streams/state/internals/RocksDBStore.java   | 12 ++++++-
 .../streams/state/KeyValueStoreTestDriver.java  | 15 ++++++--
 .../internals/RocksDBKeyValueStoreTest.java     | 28 +++++++++++++++
 .../StreamThreadStateStoreProviderTest.java     | 12 ++++++-
 .../apache/kafka/test/MockProcessorContext.java |  4 +--
 7 files changed, 111 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7f32434..a68de4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -118,6 +118,10 @@ public class StreamsConfig extends AbstractConfig {
     /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
+    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
+    public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+
+
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
                                         Type.STRING,
@@ -213,7 +217,12 @@ public class StreamsConfig extends AbstractConfig {
                                         2,
                                         atLeast(1),
                                         Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        null,
+                                        Importance.LOW,
+                                        ROCKSDB_CONFIG_SETTER_CLASS_DOC);
     }
 
     // this is the list of configs for underlying clients

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
new file mode 100644
index 0000000..20a65f1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.rocksdb.Options;
+
+import java.util.Map;
+
+/**
+ * An interface to that allows developers to customize the RocksDB settings
+ * for a given Store. Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
+ */
+public interface RocksDBConfigSetter {
+
+    /**
+     * Set the rocks db options for the provided storeName.
+     * 
+     * @param storeName     the name of the store being configured
+     * @param options       the Rocks DB options
+     * @param configs       the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
+     */
+    void setConfig(final String storeName, final Options options, final Map<String, Object> configs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a8badcd..4993173 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -19,13 +19,16 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import org.rocksdb.BlockBasedTableConfig;
@@ -44,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
@@ -121,7 +125,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
 
-        // initialize the rocksdb options
+        // initialize the default rocksdb options
         BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
         tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
         tableConfig.setBlockSize(BLOCK_SIZE);
@@ -158,6 +162,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @SuppressWarnings("unchecked")
     public void openDB(ProcessorContext context) {
+        final Map<String, Object> configs = context.appConfigs();
+        final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+        if (configSetterClass != null) {
+            final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
+            configSetter.setConfig(name, options, configs);
+        }
         // we need to construct the serde while opening DB since
         // it is also triggered by windowed DB segments without initialization
         this.serdes = new StateSerdes<>(name,

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 1861e06..5519ab4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -131,6 +131,8 @@ import java.util.Set;
  */
 public class KeyValueStoreTestDriver<K, V> {
 
+    private final Properties props;
+
     /**
      * Create a driver object that will have a {@link #context()} that records messages
      * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
@@ -214,12 +216,15 @@ public class KeyValueStoreTestDriver<K, V> {
         this.stateDir = TestUtils.tempDirectory();
         this.stateDir.mkdirs();
 
-        Properties props = new Properties();
+        props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
 
+
+
         this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) {
             @Override
             public TaskId taskId() {
@@ -254,12 +259,12 @@ public class KeyValueStoreTestDriver<K, V> {
 
             @Override
             public Map<String, Object> appConfigs() {
-                return null;
+                return new StreamsConfig(props).originals();
             }
 
             @Override
             public Map<String, Object> appConfigsWithPrefix(String prefix) {
-                return null;
+                return new StreamsConfig(props).originalsWithPrefix(prefix);
             }
         };
     }
@@ -419,4 +424,8 @@ public class KeyValueStoreTestDriver<K, V> {
         flushedEntries.clear();
         flushedRemovals.clear();
     }
+
+    public void setConfig(final String configName, final Object configValue) {
+        props.put(configName, configValue);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 280255a..6b8f3f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,10 +16,19 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+import org.rocksdb.Options;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
 
 public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -43,4 +52,23 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
         return store;
 
     }
+
+    public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
+
+        static boolean called = false;
+
+        @Override
+        public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
+            called = true;
+        }
+    }
+
+    @Test
+    public void shouldUseCustomRocksDbConfigSetter() throws Exception {
+        final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        driver.setConfig(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TheRocksDbConfigSetter.class);
+        createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        assertTrue(TheRocksDbConfigSetter.called);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c105790..f2dbcff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
@@ -35,9 +36,11 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -57,6 +60,7 @@ public class StreamThreadStateStoreProviderTest {
     private StreamTask taskTwo;
     private StreamThreadStateStoreProvider provider;
     private StateDirectory stateDirectory;
+    private File stateDir;
 
     @Before
     public void before() throws IOException {
@@ -77,7 +81,8 @@ public class StreamThreadStateStoreProviderTest {
         final String applicationId = "applicationId";
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        final String stateConfigDir = TestUtils.tempDirectory().getPath();
+        stateDir = TestUtils.tempDirectory();
+        final String stateConfigDir = stateDir.getPath();
         properties.put(StreamsConfig.STATE_DIR_CONFIG,
                 stateConfigDir);
 
@@ -111,6 +116,11 @@ public class StreamThreadStateStoreProviderTest {
 
     }
 
+    @After
+    public void cleanUp() {
+        Utils.delete(stateDir);
+    }
+    
     @Test
     public void shouldFindKeyValueStores() throws Exception {
         List<ReadOnlyKeyValueStore<String, String>> kvStores =

http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index dba82ca..d82580d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -191,12 +191,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public Map<String, Object> appConfigs() {
-        return null;
+        return Collections.emptyMap();
     }
 
     @Override
     public Map<String, Object> appConfigsWithPrefix(String prefix) {
-        return null;
+        return Collections.emptyMap();
     }
 
     public Map<String, StateStore> allStateStores() {