You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/26 14:34:04 UTC

[flink] 03/03: [FLINK-17800][rocksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1718f50645ddc01d5e2e13cc5627bafe98191fa2
Author: Yu Li <li...@apache.org>
AuthorDate: Tue Jun 16 14:01:29 2020 +0800

    [FLINK-17800][rocksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 ++---
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 ++---------
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ++++++++++++++++++++++
 .../streaming/state/RocksDBResourceContainer.java  | 34 ++++++++++++++++++++++
 .../state/RocksDBResourceContainerTest.java        | 14 +++++++++
 5 files changed, 86 insertions(+), 21 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f0cce0b..24b897a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,8 +219,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
-		WriteOptions writeOptions,
-		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -259,8 +257,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = writeOptions;
-		this.readOptions = readOptions;
+		this.writeOptions = optionsContainer.getWriteOptions();
+		this.readOptions = optionsContainer.getReadOptions();
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -370,8 +368,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
-			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index b9bdbb8..1846bde 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,9 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
-import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,9 +241,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		ColumnFamilyHandle defaultColumnFamilyHandle = null;
 		RocksDBNativeMetricMonitor nativeMetricMonitor = null;
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
-		// The write options to use in the states.
-		WriteOptions writeOptions = null;
-		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -283,9 +278,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				}
 			}
 
-			writeOptions = new WriteOptions().setDisableWAL(true);
-			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
+			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
 			// only now we can be certain that the key serializer used in the builder is final.
@@ -301,7 +294,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				keyGroupPrefixBytes,
 				kvStateInformation,
 				db,
-				readOptions,
 				writeBatchWrapper,
 				nativeMetricMonitor);
 		} catch (Throwable e) {
@@ -321,8 +313,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
-			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
 			try {
@@ -353,8 +343,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
-			writeOptions,
-			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -483,7 +471,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -498,7 +485,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					readOptions,
+					optionsContainer.getReadOptions(),
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 7e86d22..487b3da6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.WriteOptions;
 
 import java.util.Collection;
 
@@ -90,4 +92,36 @@ public interface RocksDBOptionsFactory extends java.io.Serializable {
 	default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
 		return nativeMetricOptions;
 	}
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return currentOptions;
+	}
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return currentOptions;
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index c5227a5..8c4d957 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,7 +26,9 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -138,6 +140,38 @@ public final class RocksDBResourceContainer implements AutoCloseable {
 		return opt;
 	}
 
+	/**
+	 * Gets the RocksDB {@link WriteOptions} to be used for write operations.
+	 */
+	public WriteOptions getWriteOptions() {
+		// Disable WAL by default
+		WriteOptions opt = new WriteOptions().setDisableWAL(true);
+		handlesToClose.add(opt);
+
+		// add user-defined options factory, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createWriteOptions(opt, handlesToClose);
+		}
+
+		return opt;
+	}
+
+	/**
+	 * Gets the RocksDB {@link ReadOptions} to be used for read operations.
+	 */
+	public ReadOptions getReadOptions() {
+		// We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
+		ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+		handlesToClose.add(opt);
+
+		// add user-defined options factory, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createReadOptions(opt, handlesToClose);
+		}
+
+		return opt;
+	}
+
 	RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
 		return optionsFactory == null
 				? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 73d4bab..eb76e62 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,7 +31,9 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
+import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -246,4 +248,16 @@ public class RocksDBResourceContainerTest {
 		assertThat(cache.isOwningHandle(), is(false));
 		assertThat(wbm.isOwningHandle(), is(false));
 	}
+
+	@Test
+	public void testFreeWriteReadOptionsAfterClose() throws Exception {
+		RocksDBResourceContainer container = new RocksDBResourceContainer();
+		WriteOptions writeOptions = container.getWriteOptions();
+		ReadOptions readOptions = container.getReadOptions();
+		assertThat(writeOptions.isOwningHandle(), is(true));
+		assertThat(readOptions.isOwningHandle(), is(true));
+		container.close();
+		assertThat(writeOptions.isOwningHandle(), is(false));
+		assertThat(readOptions.isOwningHandle(), is(false));
+	}
 }