You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 20:32:37 UTC

[kafka] branch trunk updated: KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16769d2  KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743)
16769d2 is described below

commit 16769d263e2e8fd91704d2e3519abf9dcba507df
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri May 17 13:32:17 2019 -0700

    KAFKA-8215: Upgrade Rocks to v5.18.3 (#6743)
    
    This upgrade exposes a number of new options, including the WriteBufferManager which -- along with existing TableConfig options -- allows users to limit the total memory used by RocksDB across instances. This can alleviate some cascading OOM potential when, for example, a large number of stateful tasks are suddenly migrated to the same host.
    
    The RocksDB docs guarantee backwards format compatibility across versions
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>,
---
 checkstyle/suppressions.xml                        |  2 ++
 gradle/dependencies.gradle                         |  2 +-
 ...tionsToDbOptionsColumnFamilyOptionsAdapter.java | 28 ++++++++++++++++++++++
 .../streams/state/internals/RocksDBStore.java      | 14 ++++++++---
 .../state/internals/RocksDBTimestampedStore.java   |  1 +
 ...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 25 +++++++++++++++++++
 .../streams/state/internals/RocksDBStoreTest.java  |  7 +++++-
 7 files changed, 74 insertions(+), 5 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 977a7ac..6b1cccc 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -111,6 +111,8 @@
               files="KafkaConfigBackingStore.java"/>
     <suppress checks="CyclomaticComplexity"
               files="(Values|ConnectHeader|ConnectHeaders).java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java"/>
 
     <suppress checks="JavaNCSS"
               files="KafkaConfigBackingStore.java"/>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index bed83fe..d0b897b 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -86,7 +86,7 @@ versions += [
   owaspDepCheckPlugin: "4.0.2",
   powermock: "2.0.2",
   reflections: "0.9.11",
-  rocksDB: "5.15.10",
+  rocksDB: "5.18.3",
   scalafmt: "1.5.1",
   scalatest: "3.0.7",
   scoverage: "1.3.1",
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
index c07e43b..d28682a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.AbstractCompactionFilterFactory;
 import org.rocksdb.AbstractComparator;
 import org.rocksdb.AbstractSlice;
 import org.rocksdb.AccessHint;
@@ -44,6 +46,8 @@ import org.rocksdb.WALRecoveryMode;
 
 import java.util.Collection;
 import java.util.List;
+import org.rocksdb.WriteBufferManager;
+import org.slf4j.LoggerFactory;
 
 /**
  * The generic {@link Options} class allows users to set all configs on one object if only default column family
@@ -56,6 +60,8 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
     private final DBOptions dbOptions;
     private final ColumnFamilyOptions columnFamilyOptions;
 
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class);
+
     RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(final DBOptions dbOptions,
                                                                final ColumnFamilyOptions columnFamilyOptions) {
         this.dbOptions = dbOptions;
@@ -484,6 +490,7 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
 
     @Override
     public Options setWalTtlSeconds(final long walTtlSeconds) {
+        LOG.warn("option walTtlSeconds will be ignored: Streams does not expose RocksDB ttl functionality");
         dbOptions.setWalTtlSeconds(walTtlSeconds);
         return this;
     }
@@ -1355,6 +1362,27 @@ class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options
     }
 
     @Override
+    public Options setWriteBufferManager(final WriteBufferManager writeBufferManager) {
+        dbOptions.setWriteBufferManager(writeBufferManager);
+        return this;
+    }
+
+    @Override
+    public WriteBufferManager writeBufferManager() {
+        return dbOptions.writeBufferManager();
+    }
+
+    public Options setCompactionFilter(final AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter) {
+        columnFamilyOptions.setCompactionFilter(compactionFilter);
+        return this;
+    }
+
+    public Options setCompactionFilterFactory(final AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>> compactionFilterFactory) {
+        columnFamilyOptions.setCompactionFilterFactory(compactionFilterFactory);
+        return this;
+    }
+
+    @Override
     public void close() {
         columnFamilyOptions.close();
         dbOptions.close();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index ed74468..0643e64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -40,6 +41,7 @@ import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -86,10 +88,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
     RocksDB db;
     RocksDBAccessor dbAccessor;
 
-    // the following option objects will be created in the constructor and closed in the close() method
+    // the following option objects will be created in openDB and closed in the close() method
     private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
     WriteOptions wOptions;
     FlushOptions fOptions;
+    private Cache cache;
     private BloomFilter filter;
 
     private RocksDBConfigSetter configSetter;
@@ -121,9 +124,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions);
 
         final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+        cache = new LRUCache(BLOCK_CACHE_SIZE);
+        tableConfig.setBlockCache(cache);
         tableConfig.setBlockSize(BLOCK_SIZE);
-        
+
         filter = new BloomFilter();
         tableConfig.setFilter(filter);
 
@@ -404,12 +408,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         fOptions.close();
         db.close();
         filter.close();
+        cache.close();
 
         dbAccessor = null;
         userSpecifiedOptions = null;
         wOptions = null;
         fOptions = null;
         db = null;
+        filter = null;
+        cache = null;
     }
 
     private void closeOpenIterators() {
@@ -564,6 +571,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
         }
 
         @Override
+        @SuppressWarnings("deprecation")
         public void toggleDbForBulkLoading() {
             try {
                 db.compactRange(columnFamily, true, 1, 0);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 5466ce8..05db0ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -247,6 +247,7 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
         }
 
         @Override
+        @SuppressWarnings("deprecation")
         public void toggleDbForBulkLoading() {
             try {
                 db.compactRange(oldColumnFamily, true, 1, 0);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 74e5cd5..62513a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -20,6 +20,9 @@ import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.AbstractCompactionFilter.Context;
+import org.rocksdb.AbstractCompactionFilterFactory;
 import org.rocksdb.AccessHint;
 import org.rocksdb.BuiltinComparator;
 import org.rocksdb.ColumnFamilyOptions;
@@ -35,11 +38,13 @@ import org.rocksdb.Logger;
 import org.rocksdb.Options;
 import org.rocksdb.PlainTableConfig;
 import org.rocksdb.RateLimiter;
+import org.rocksdb.RemoveEmptyValueCompactionFilter;
 import org.rocksdb.RocksDB;
 import org.rocksdb.SstFileManager;
 import org.rocksdb.StringAppendOperator;
 import org.rocksdb.VectorMemTableConfig;
 import org.rocksdb.WALRecoveryMode;
+import org.rocksdb.WriteBufferManager;
 import org.rocksdb.util.BytewiseComparator;
 
 import java.lang.reflect.InvocationTargetException;
@@ -167,6 +172,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
                 case "org.rocksdb.WALRecoveryMode":
                     parameters[i] = WALRecoveryMode.AbsoluteConsistency;
                     break;
+                case "org.rocksdb.WriteBufferManager":
+                    parameters[i] = new WriteBufferManager(1L, new LRUCache(1L));
+                    break;
                 default:
                     parameters[i] = parameterTypes[i].newInstance();
             }
@@ -229,6 +237,23 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
                 case "java.util.List":
                     parameters[i] = new ArrayList<>();
                     break;
+                case "org.rocksdb.AbstractCompactionFilter":
+                    parameters[i] = new RemoveEmptyValueCompactionFilter();
+                    break;
+                case "org.rocksdb.AbstractCompactionFilterFactory":
+                    parameters[i] = new AbstractCompactionFilterFactory() {
+
+                        @Override
+                        public AbstractCompactionFilter<?> createCompactionFilter(final Context context) {
+                            return null;
+                        }
+
+                        @Override
+                        public String name() {
+                            return "AbstractCompactionFilterFactory";
+                        }
+                    };
+                    break;
                 case "org.rocksdb.AbstractComparator":
                     parameters[i] = new BytewiseComparator(new ComparatorOptions());
                     break;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index bfb1ecd..3d67866 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -38,6 +38,8 @@ import org.junit.Test;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
 import org.rocksdb.Filter;
+import org.rocksdb.Cache;
+import org.rocksdb.LRUCache;
 import org.rocksdb.Options;
 
 import java.io.File;
@@ -489,11 +491,13 @@ public class RocksDBStoreTest {
 
         static boolean bloomFiltersSet;
         static Filter filter;
+        static Cache cache;
 
         @Override
         public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
             final BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-            tableConfig.setBlockCacheSize(50 * 1024 * 1024L);
+            cache = new LRUCache(50 * 1024 * 1024L);
+            tableConfig.setBlockCache(cache);
             tableConfig.setBlockSize(4096L);
             if (enableBloomFilters) {
                 filter = new BloomFilter();
@@ -513,6 +517,7 @@ public class RocksDBStoreTest {
             if (filter != null) {
                 filter.close();
             }
+            cache.close();
         }
     }