You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/21 18:04:12 UTC
kafka git commit: KAFKA-3740: Enable configuration of RocksDBStores
Repository: kafka
Updated Branches:
refs/heads/trunk 0e5700fb6 -> 999108667
KAFKA-3740: Enable configuration of RocksDBStores
Add new config StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG to enable advanced
RocksDB users to override default RocksDB configuration
Author: Damian Guy <da...@gmail.com>
Reviewers: Roger Hoover, Dan Norwood, Eno Thereska, Guozhang Wang
Closes #1640 from dguy/kafka-3740-listener
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/99910866
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/99910866
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/99910866
Branch: refs/heads/trunk
Commit: 9991086671b0a74bc5fd5698ba81aaf9a2985404
Parents: 0e5700f
Author: Damian Guy <da...@gmail.com>
Authored: Thu Jul 21 11:04:08 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 21 11:04:08 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 11 +++++-
.../streams/state/RocksDBConfigSetter.java | 37 ++++++++++++++++++++
.../streams/state/internals/RocksDBStore.java | 12 ++++++-
.../streams/state/KeyValueStoreTestDriver.java | 15 ++++++--
.../internals/RocksDBKeyValueStoreTest.java | 28 +++++++++++++++
.../StreamThreadStateStoreProviderTest.java | 12 ++++++-
.../apache/kafka/test/MockProcessorContext.java | 4 +--
7 files changed, 111 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7f32434..a68de4f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -118,6 +118,10 @@ public class StreamsConfig extends AbstractConfig {
/** <code>client.id</code> */
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+ public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
+ public static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+
+
static {
CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value
Type.STRING,
@@ -213,7 +217,12 @@ public class StreamsConfig extends AbstractConfig {
2,
atLeast(1),
Importance.LOW,
- CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+ CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+ .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+ Type.CLASS,
+ null,
+ Importance.LOW,
+ ROCKSDB_CONFIG_SETTER_CLASS_DOC);
}
// this is the list of configs for underlying clients
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
new file mode 100644
index 0000000..20a65f1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.rocksdb.Options;
+
+import java.util.Map;
+
+/**
+ * An interface to that allows developers to customize the RocksDB settings
+ * for a given Store. Please read the <a href="https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide">RocksDB Tuning Guide</a>.
+ */
+public interface RocksDBConfigSetter {
+
+ /**
+ * Set the rocks db options for the provided storeName.
+ *
+ * @param storeName the name of the store being configured
+ * @param options the Rocks DB options
+ * @param configs the configuration supplied to {@link org.apache.kafka.streams.StreamsConfig}
+ */
+ void setConfig(final String storeName, final Options options, final Map<String, Object> configs);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a8badcd..4993173 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -19,13 +19,16 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.StateSerdes;
import org.rocksdb.BlockBasedTableConfig;
@@ -44,6 +47,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@@ -121,7 +125,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
- // initialize the rocksdb options
+ // initialize the default rocksdb options
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
tableConfig.setBlockSize(BLOCK_SIZE);
@@ -158,6 +162,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@SuppressWarnings("unchecked")
public void openDB(ProcessorContext context) {
+ final Map<String, Object> configs = context.appConfigs();
+ final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+ if (configSetterClass != null) {
+ final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
+ configSetter.setConfig(name, options, configs);
+ }
// we need to construct the serde while opening DB since
// it is also triggered by windowed DB segments without initialization
this.serdes = new StateSerdes<>(name,
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 1861e06..5519ab4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -131,6 +131,8 @@ import java.util.Set;
*/
public class KeyValueStoreTestDriver<K, V> {
+ private final Properties props;
+
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
@@ -214,12 +216,15 @@ public class KeyValueStoreTestDriver<K, V> {
this.stateDir = TestUtils.tempDirectory();
this.stateDir.mkdirs();
- Properties props = new Properties();
+ props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "applicationId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
+
+
this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector) {
@Override
public TaskId taskId() {
@@ -254,12 +259,12 @@ public class KeyValueStoreTestDriver<K, V> {
@Override
public Map<String, Object> appConfigs() {
- return null;
+ return new StreamsConfig(props).originals();
}
@Override
public Map<String, Object> appConfigsWithPrefix(String prefix) {
- return null;
+ return new StreamsConfig(props).originalsWithPrefix(prefix);
}
};
}
@@ -419,4 +424,8 @@ public class KeyValueStoreTestDriver<K, V> {
flushedEntries.clear();
flushedRemovals.clear();
}
+
+ public void setConfig(final String configName, final Object configValue) {
+ props.put(configName, configValue);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 280255a..6b8f3f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -16,10 +16,19 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+import org.rocksdb.Options;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
@@ -43,4 +52,23 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
return store;
}
+
+ public static class TheRocksDbConfigSetter implements RocksDBConfigSetter {
+
+ static boolean called = false;
+
+ @Override
+ public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
+ called = true;
+ }
+ }
+
+ @Test
+ public void shouldUseCustomRocksDbConfigSetter() throws Exception {
+ final KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+ driver.setConfig(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TheRocksDbConfigSetter.class);
+ createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ assertTrue(TheRocksDbConfigSetter.called);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c105790..f2dbcff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
@@ -35,9 +36,11 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -57,6 +60,7 @@ public class StreamThreadStateStoreProviderTest {
private StreamTask taskTwo;
private StreamThreadStateStoreProvider provider;
private StateDirectory stateDirectory;
+ private File stateDir;
@Before
public void before() throws IOException {
@@ -77,7 +81,8 @@ public class StreamThreadStateStoreProviderTest {
final String applicationId = "applicationId";
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- final String stateConfigDir = TestUtils.tempDirectory().getPath();
+ stateDir = TestUtils.tempDirectory();
+ final String stateConfigDir = stateDir.getPath();
properties.put(StreamsConfig.STATE_DIR_CONFIG,
stateConfigDir);
@@ -111,6 +116,11 @@ public class StreamThreadStateStoreProviderTest {
}
+ @After
+ public void cleanUp() {
+ Utils.delete(stateDir);
+ }
+
@Test
public void shouldFindKeyValueStores() throws Exception {
List<ReadOnlyKeyValueStore<String, String>> kvStores =
http://git-wip-us.apache.org/repos/asf/kafka/blob/99910866/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index dba82ca..d82580d 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -191,12 +191,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@Override
public Map<String, Object> appConfigs() {
- return null;
+ return Collections.emptyMap();
}
@Override
public Map<String, Object> appConfigsWithPrefix(String prefix) {
- return null;
+ return Collections.emptyMap();
}
public Map<String, StateStore> allStateStores() {