You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/31 06:21:18 UTC
kafka git commit: KAKFA-5334: Allow rocksdb.config.setter to be
specified as a String or Class instance
Repository: kafka
Updated Branches:
refs/heads/trunk d08256390 -> dd8cdb79d
KAKFA-5334: Allow rocksdb.config.setter to be specified as a String or Class instance
Handle` rocksdb.config.setter` being set as a class name or class
instance.
Author: Tommy Becker <to...@tivo.com>
Author: Tommy Becker <tw...@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #3155 from twbecker/KAFKA-5334
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd8cdb79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd8cdb79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd8cdb79
Branch: refs/heads/trunk
Commit: dd8cdb79d356b3049c21bf00613b8144dff0882c
Parents: d082563
Author: Tommy Becker <to...@tivo.com>
Authored: Tue May 30 23:21:12 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 30 23:21:12 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../streams/state/internals/RocksDBStore.java | 8 +-
.../state/internals/RocksDBStoreTest.java | 103 +++++++++++++++++++
3 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index deb5e89..4238490 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -227,7 +227,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code rocksdb.config.setter} */
public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
- private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+ private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>RocksDBConfigSetter</code> interface";
/** {@code security.protocol} */
public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a01de77..7a0b6ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
@@ -142,7 +143,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
fOptions.setWaitForFlush(true);
final Map<String, Object> configs = context.appConfigs();
- final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+ final Object configSetterValue = configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+ final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) ConfigDef.parseType(
+ StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+ configSetterValue,
+ ConfigDef.Type.CLASS);
+
if (configSetterClass != null) {
final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
configSetter.setConfig(name, options, configs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
new file mode 100644
index 0000000..c43a39a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.Options;
+
+public class RocksDBStoreTest {
+ private final File tempDir = TestUtils.tempDirectory();
+
+ private RocksDBStore<String, String> subject;
+
+ @Before
+ public void setUp() throws Exception {
+ subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ subject.close();
+ }
+
+ @Test
+ public void canSpecifyConfigSetterAsClass() throws Exception {
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
+ MockRocksDbConfigSetter.called = false;
+ subject.openDB(new ConfigurableProcessorContext(tempDir, Serdes.String(), Serdes.String(),
+ null, null, configs));
+
+ assertTrue(MockRocksDbConfigSetter.called);
+ }
+
+ @Test
+ public void canSpecifyConfigSetterAsString() throws Exception {
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName());
+ MockRocksDbConfigSetter.called = false;
+ subject.openDB(new ConfigurableProcessorContext(tempDir, Serdes.String(), Serdes.String(),
+ null, null, configs));
+
+ assertTrue(MockRocksDbConfigSetter.called);
+ }
+
+
+ public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
+ static boolean called;
+
+ @Override
+ public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
+ called = true;
+ }
+ }
+
+
+ private static class ConfigurableProcessorContext extends MockProcessorContext {
+ final Map<String, Object> configs;
+
+ ConfigurableProcessorContext(final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valSerde,
+ final RecordCollector collector,
+ final ThreadCache cache,
+ final Map<String, Object> configs) {
+ super(stateDir, keySerde, valSerde, collector, cache);
+ this.configs = configs;
+ }
+
+ @Override
+ public Map<String, Object> appConfigs() {
+ return configs;
+ }
+ }
+}