You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/31 06:21:18 UTC

kafka git commit: KAKFA-5334: Allow rocksdb.config.setter to be specified as a String or Class instance

Repository: kafka
Updated Branches:
  refs/heads/trunk d08256390 -> dd8cdb79d


KAKFA-5334: Allow rocksdb.config.setter to be specified as a String or Class instance

Handle` rocksdb.config.setter` being set as a class name or class
instance.

Author: Tommy Becker <to...@tivo.com>
Author: Tommy Becker <tw...@gmail.com>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #3155 from twbecker/KAFKA-5334


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd8cdb79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd8cdb79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd8cdb79

Branch: refs/heads/trunk
Commit: dd8cdb79d356b3049c21bf00613b8144dff0882c
Parents: d082563
Author: Tommy Becker <to...@tivo.com>
Authored: Tue May 30 23:21:12 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 30 23:21:12 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |   2 +-
 .../streams/state/internals/RocksDBStore.java   |   8 +-
 .../state/internals/RocksDBStoreTest.java       | 103 +++++++++++++++++++
 3 files changed, 111 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index deb5e89..4238490 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -227,7 +227,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code rocksdb.config.setter} */
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
-    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class that implements the <code>RocksDBConfigSetter</code> interface";
+    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>RocksDBConfigSetter</code> interface";
 
     /** {@code security.protocol} */
     public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a01de77..7a0b6ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
@@ -142,7 +143,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         fOptions.setWaitForFlush(true);
 
         final Map<String, Object> configs = context.appConfigs();
-        final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+        final Object configSetterValue = configs.get(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG);
+        final Class<RocksDBConfigSetter> configSetterClass = (Class<RocksDBConfigSetter>) ConfigDef.parseType(
+                StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+                configSetterValue,
+                ConfigDef.Type.CLASS);
+
         if (configSetterClass != null) {
             final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass);
             configSetter.setConfig(name, options, configs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd8cdb79/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
new file mode 100644
index 0000000..c43a39a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.RocksDBConfigSetter;
+import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.Options;
+
+public class RocksDBStoreTest {
+    private final File tempDir = TestUtils.tempDirectory();
+
+    private RocksDBStore<String, String> subject;
+
+    @Before
+    public void setUp() throws Exception {
+        subject = new RocksDBStore<>("test", Serdes.String(), Serdes.String());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        subject.close();
+    }
+
+    @Test
+    public void canSpecifyConfigSetterAsClass() throws Exception {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
+        MockRocksDbConfigSetter.called = false;
+        subject.openDB(new ConfigurableProcessorContext(tempDir, Serdes.String(), Serdes.String(),
+                null, null, configs));
+
+        assertTrue(MockRocksDbConfigSetter.called);
+    }
+
+    @Test
+    public void canSpecifyConfigSetterAsString() throws Exception {
+        final Map<String, Object> configs = new HashMap<>();
+        configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class.getName());
+        MockRocksDbConfigSetter.called = false;
+        subject.openDB(new ConfigurableProcessorContext(tempDir, Serdes.String(), Serdes.String(),
+                null, null, configs));
+
+        assertTrue(MockRocksDbConfigSetter.called);
+    }
+
+
+    public static class MockRocksDbConfigSetter implements RocksDBConfigSetter {
+        static boolean called;
+
+        @Override
+        public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
+            called = true;
+        }
+    }
+
+
+    private static class ConfigurableProcessorContext extends MockProcessorContext {
+        final Map<String, Object> configs;
+
+        ConfigurableProcessorContext(final File stateDir,
+                                     final Serde<?> keySerde,
+                                     final Serde<?> valSerde,
+                                     final RecordCollector collector,
+                                     final ThreadCache cache,
+                                     final Map<String, Object> configs) {
+            super(stateDir, keySerde, valSerde, collector, cache);
+            this.configs = configs;
+        }
+
+        @Override
+        public Map<String, Object> appConfigs() {
+            return configs;
+        }
+    }
+}