You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 20:32:37 UTC
[kafka] branch trunk updated: KAFKA-8215: Upgrade Rocks to v5.18.3
(#6743)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16769d2 KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743)
16769d2 is described below
commit 16769d263e2e8fd91704d2e3519abf9dcba507df
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri May 17 13:32:17 2019 -0700
KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743)
This upgrade exposes a number of new options, including the WriteBufferManager which -- along with existing TableConfig options -- allows users to limit the total memory used by RocksDB across instances. This can alleviate some cascading OOM potential when, for example, a large number of stateful tasks are suddenly migrated to the same host.
The RocksDB docs guarantee backwards format compatibility across versions
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>,
---
checkstyle/suppressions.xml | 2 ++
gradle/dependencies.gradle | 2 +-
...tionsToDbOptionsColumnFamilyOptionsAdapter.java | 28 ++++++++++++++++++++++
.../streams/state/internals/RocksDBStore.java | 14 ++++++++---
.../state/internals/RocksDBTimestampedStore.java | 1 +
...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 25 +++++++++++++++++++
.../streams/state/internals/RocksDBStoreTest.java | 7 +++++-
7 files changed, 74 insertions(+), 5 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 977a7ac..6b1cccc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -111,6 +111,8 @@
files="KafkaConfigBackingStore.java"/>
<suppress checks="CyclomaticComplexity"
files="(Values|ConnectHeader|ConnectHeaders).java"/>
+ <suppress checks="CyclomaticComplexity"
+ files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>
<suppress checks="JavaNCSS"
files="KafkaConfigBackingStore.java"/>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index bed83fe..d0b897b 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -86,7 +86,7 @@ versions += [
owaspDepCheckPlugin: "4.0.2",
powermock: "2.0.2",
reflections: "0.9.11",
- rocksDB: "5.15.10",
+ rocksDB: "5.18.3",
scalafmt: "1.5.1",
scalatest: "3.0.7",
scoverage: "1.3.1",
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
index c07e43b..d28682a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AbstractComparator;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AccessHint;
@@ -44,6 +46,8 @@ import org.rocksdb.WALRecoveryMode;
import java.util.Collection;
import java.util.List;
+import org.rocksdb.WriteBufferManager;
+import org.slf4j.LoggerFactory;
/**
* The generic {@link Options} class allows users to set all configs on one object if only default column family
@@ -56,6 +60,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
private final DBOptions dbOptions;
private final ColumnFamilyOptions columnFamilyOptions;
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class);
+
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
this.dbOptions = dbOptions;
@@ -484,6 +490,7 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
@Override
public Options setWalTtlSeconds(final long walTtlSeconds) {
+ LOG.warn("option walTtlSeconds will be ignored: Streams does not expose RocksDB ttl functionality");
dbOptions.setWalTtlSeconds(walTtlSeconds);
return this;
}
@@ -1355,6 +1362,27 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
}
@Override
+ public Options setWriteBufferManager(final WriteBufferManager writeBufferManager) {
+ dbOptions.setWriteBufferManager(writeBufferManager);
+ return this;
+ }
+
+ @Override
+ public WriteBufferManager writeBufferManager() {
+ return dbOptions.writeBufferManager();
+ }
+
+ public Options setCompactionFilter(final AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter) {
+ columnFamilyOptions.setCompactionFilter(compactionFilter);
+ return this;
+ }
+
+ public Options setCompactionFilterFactory(final AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory) {
+ columnFamilyOptions.setCompactionFilterFactory(compactionFilterFactory);
+ return this;
+ }
+
+ @Override
public void close() {
columnFamilyOptions.close();
dbOptions.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ed74468..0643e64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@@ -40,6 +41,7 @@ import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -86,10 +88,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
RocksDB db;
RocksDBAccessor dbAccessor;
- // the following option objects will be created in the constructor and closed in the close() method
+ // the following option objects will be created in openDB and closed in the close() method
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
WriteOptions wOptions;
FlushOptions fOptions;
+ private Cache cache;
private BloomFilter filter;
private RocksDBConfigSetter configSetter;
@@ -121,9 +124,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions);
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+ cache = new LRUCache(BLOCK_CACHE_SIZE);
+ tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(BLOCK_SIZE);
-
+
filter = new BloomFilter();
tableConfig.setFilter(filter);
@@ -404,12 +408,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
fOptions.close();
db.close();
filter.close();
+ cache.close();
dbAccessor = null;
userSpecifiedOptions = null;
wOptions = null;
fOptions = null;
db = null;
+ filter = null;
+ cache = null;
}
private void closeOpenIterators() {
@@ -564,6 +571,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
}
@Override
+ @SuppressWarnings("deprecation")
public void toggleDbForBulkLoading() {
try {
db.compactRange(columnFamily, true, 1, 0);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 5466ce8..05db0ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -247,6 +247,7 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
+ @SuppressWarnings("deprecation")
public void toggleDbForBulkLoading() {
try {
db.compactRange(oldColumnFamily, true, 1, 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 74e5cd5..62513a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -20,6 +20,9 @@ import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.AbstractCompactionFilter.Context;
+import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
@@ -35,11 +38,13 @@ import org.rocksdb.Logger;
import org.rocksdb.Options;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RateLimiter;
+import org.rocksdb.RemoveEmptyValueCompactionFilter;
import org.rocksdb.RocksDB;
import org.rocksdb.SstFileManager;
import org.rocksdb.StringAppendOperator;
import org.rocksdb.VectorMemTableConfig;
import org.rocksdb.WALRecoveryMode;
+import org.rocksdb.WriteBufferManager;
import org.rocksdb.util.BytewiseComparator;
import java.lang.reflect.InvocationTargetException;
@@ -167,6 +172,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
case "org.rocksdb.WALRecoveryMode":
parameters[i] = WALRecoveryMode.AbsoluteConsistency;
break;
+ case "org.rocksdb.WriteBufferManager":
+ parameters[i] = new WriteBufferManager(1L, new LRUCache(1L));
+ break;
default:
parameters[i] = parameterTypes[i].newInstance();
}
@@ -229,6 +237,23 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
case "java.util.List":
parameters[i] = new ArrayList<>();
break;
+ case "org.rocksdb.AbstractCompactionFilter":
+ parameters[i] = new RemoveEmptyValueCompactionFilter();
+ break;
+ case "org.rocksdb.AbstractCompactionFilterFactory":
+ parameters[i] = new AbstractCompactionFilterFactory() {
+
+ @Override
+ public AbstractCompactionFilter<?> createCompactionFilter(final Context context) {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return "AbstractCompactionFilterFactory";
+ }
+ };
+ break;
case "org.rocksdb.AbstractComparator":
parameters[i] = new BytewiseComparator(new ComparatorOptions());
break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index bfb1ecd..3d67866 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -38,6 +38,8 @@ import org.junit.Test;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Filter;
+import org.rocksdb.Cache;
+import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import java.io.File;
@@ -489,11 +491,13 @@ public class RocksDBStoreTest {
static boolean bloomFiltersSet;
static Filter filter;
+ static Cache cache;
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(50 * 1024 * 1024L);
+ cache = new LRUCache(50 * 1024 * 1024L);
+ tableConfig.setBlockCache(cache);
tableConfig.setBlockSize(4096L);
if (enableBloomFilters) {
filter = new BloomFilter();
@@ -513,6 +517,7 @@ public class RocksDBStoreTest {
if (filter != null) {
filter.close();
}
+ cache.close();
}
}