You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/26 14:34:04 UTC
[flink] 03/03: [FLINK-17800][rocksdb] Support customized RocksDB
write/read options and use RocksDBResourceContainer to get them
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1718f50645ddc01d5e2e13cc5627bafe98191fa2
Author: Yu Li <li...@apache.org>
AuthorDate: Tue Jun 16 14:01:29 2020 +0800
[FLINK-17800][rocksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them
---
.../streaming/state/RocksDBKeyedStateBackend.java | 8 ++---
.../state/RocksDBKeyedStateBackendBuilder.java | 17 ++---------
.../streaming/state/RocksDBOptionsFactory.java | 34 ++++++++++++++++++++++
.../streaming/state/RocksDBResourceContainer.java | 34 ++++++++++++++++++++++
.../state/RocksDBResourceContainerTest.java | 14 +++++++++
5 files changed, 86 insertions(+), 21 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f0cce0b..24b897a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,8 +219,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
- WriteOptions writeOptions,
- ReadOptions readOptions,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
@@ -259,8 +257,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;
- this.writeOptions = writeOptions;
- this.readOptions = readOptions;
+ this.writeOptions = optionsContainer.getWriteOptions();
+ this.readOptions = optionsContainer.getReadOptions();
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
this.writeBatchSize = writeBatchSize;
this.db = db;
@@ -370,8 +368,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyOptions.forEach(IOUtils::closeQuietly);
IOUtils.closeQuietly(optionsContainer);
- IOUtils.closeQuietly(readOptions);
- IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index b9bdbb8..1846bde 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,9 +54,7 @@ import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
-import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,9 +241,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
ColumnFamilyHandle defaultColumnFamilyHandle = null;
RocksDBNativeMetricMonitor nativeMetricMonitor = null;
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
- // The write options to use in the states.
- WriteOptions writeOptions = null;
- ReadOptions readOptions = null;
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -283,9 +278,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
}
}
- writeOptions = new WriteOptions().setDisableWAL(true);
- readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
- writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
+ writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
// only now we can be certain that the key serializer used in the builder is final.
@@ -301,7 +294,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
keyGroupPrefixBytes,
kvStateInformation,
db,
- readOptions,
writeBatchWrapper,
nativeMetricMonitor);
} catch (Throwable e) {
@@ -321,8 +313,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
- IOUtils.closeQuietly(readOptions);
- IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
try {
@@ -353,8 +343,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
this.executionConfig,
this.ttlTimeProvider,
db,
- writeOptions,
- readOptions,
kvStateInformation,
keyGroupPrefixBytes,
cancelStreamRegistryForBackend,
@@ -483,7 +471,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
int keyGroupPrefixBytes,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
- ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
@@ -498,7 +485,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
numberOfKeyGroups,
kvStateInformation,
db,
- readOptions,
+ optionsContainer.getReadOptions(),
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 7e86d22..487b3da6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.WriteOptions;
import java.util.Collection;
@@ -90,4 +92,36 @@ public interface RocksDBOptionsFactory extends java.io.Serializable {
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}
+
+ /**
+ * This method should set the additional options on top of the current options object.
+ * The current options object may contain pre-defined options based on flags that have
+ * been configured on the state backend.
+ *
+ * <p>It is important to set the options on the current object and return the result from
+ * the setter methods, otherwise the pre-defined options may get lost.
+ *
+ * @param currentOptions The options object with the pre-defined options.
+ * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+ * @return The options object on which the additional options are set.
+ */
+ default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
+
+ /**
+ * This method should set the additional options on top of the current options object.
+ * The current options object may contain pre-defined options based on flags that have
+ * been configured on the state backend.
+ *
+ * <p>It is important to set the options on the current object and return the result from
+ * the setter methods, otherwise the pre-defined options may get lost.
+ *
+ * @param currentOptions The options object with the pre-defined options.
+ * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+ * @return The options object on which the additional options are set.
+ */
+ default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index c5227a5..8c4d957 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,7 +26,9 @@ import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
import javax.annotation.Nullable;
@@ -138,6 +140,38 @@ public final class RocksDBResourceContainer implements AutoCloseable {
return opt;
}
+ /**
+ * Gets the RocksDB {@link WriteOptions} to be used for write operations.
+ */
+ public WriteOptions getWriteOptions() {
+ // Disable WAL by default
+ WriteOptions opt = new WriteOptions().setDisableWAL(true);
+ handlesToClose.add(opt);
+
+ // add user-defined options factory, if specified
+ if (optionsFactory != null) {
+ opt = optionsFactory.createWriteOptions(opt, handlesToClose);
+ }
+
+ return opt;
+ }
+
+ /**
+ * Gets the RocksDB {@link ReadOptions} to be used for read operations.
+ */
+ public ReadOptions getReadOptions() {
+ // We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
+ ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+ handlesToClose.add(opt);
+
+ // add user-defined options factory, if specified
+ if (optionsFactory != null) {
+ opt = optionsFactory.createReadOptions(opt, handlesToClose);
+ }
+
+ return opt;
+ }
+
RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
return optionsFactory == null
? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 73d4bab..eb76e62 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,7 +31,9 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.ReadOptions;
import org.rocksdb.WriteBufferManager;
+import org.rocksdb.WriteOptions;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -246,4 +248,16 @@ public class RocksDBResourceContainerTest {
assertThat(cache.isOwningHandle(), is(false));
assertThat(wbm.isOwningHandle(), is(false));
}
+
+ @Test
+ public void testFreeWriteReadOptionsAfterClose() throws Exception {
+ RocksDBResourceContainer container = new RocksDBResourceContainer();
+ WriteOptions writeOptions = container.getWriteOptions();
+ ReadOptions readOptions = container.getReadOptions();
+ assertThat(writeOptions.isOwningHandle(), is(true));
+ assertThat(readOptions.isOwningHandle(), is(true));
+ container.close();
+ assertThat(writeOptions.isOwningHandle(), is(false));
+ assertThat(readOptions.isOwningHandle(), is(false));
+ }
}