You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/06/17 12:33:11 UTC

[flink] 02/03: Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e367fb06a3f02360bd8e9216dd477d3e3b68186
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 10:13:19 2020 +0200

    Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"
    
    This reverts commit f1250625b2ade530fa2619d6e1bb734832748d31.
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 +++--
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 +++++++++--
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ----------------------
 .../streaming/state/RocksDBResourceContainer.java  | 34 ----------------------
 .../state/RocksDBResourceContainerTest.java        | 14 ---------
 5 files changed, 21 insertions(+), 86 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 24b897a..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
+		WriteOptions writeOptions,
+		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -257,8 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = optionsContainer.getWriteOptions();
-		this.readOptions = optionsContainer.getReadOptions();
+		this.writeOptions = writeOptions;
+		this.readOptions = readOptions;
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -368,6 +370,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
+			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 1846bde..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,9 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -241,6 +243,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		ColumnFamilyHandle defaultColumnFamilyHandle = null;
 		RocksDBNativeMetricMonitor nativeMetricMonitor = null;
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
+		// The write options to use in the states.
+		WriteOptions writeOptions = null;
+		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -278,7 +283,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				}
 			}
 
-			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
+			writeOptions = new WriteOptions().setDisableWAL(true);
+			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
 			// only now we can be certain that the key serializer used in the builder is final.
@@ -294,6 +301,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				keyGroupPrefixBytes,
 				kvStateInformation,
 				db,
+				readOptions,
 				writeBatchWrapper,
 				nativeMetricMonitor);
 		} catch (Throwable e) {
@@ -313,6 +321,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
+			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
 			try {
@@ -343,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
+			writeOptions,
+			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -471,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -485,7 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					optionsContainer.getReadOptions(),
+					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 5ee5098..a5fd1e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.WriteOptions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,38 +93,6 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
 		return nativeMetricOptions;
 	}
 
-	/**
-	 * This method should set the additional options on top of the current options object.
-	 * The current options object may contain pre-defined options based on flags that have
-	 * been configured on the state backend.
-	 *
-	 * <p>It is important to set the options on the current object and return the result from
-	 * the setter methods, otherwise the pre-defined options may get lost.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
-	 * @return The options object on which the additional options are set.
-	 */
-	default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-		return currentOptions;
-	}
-
-	/**
-	 * This method should set the additional options on top of the current options object.
-	 * The current options object may contain pre-defined options based on flags that have
-	 * been configured on the state backend.
-	 *
-	 * <p>It is important to set the options on the current object and return the result from
-	 * the setter methods, otherwise the pre-defined options may get lost.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
-	 * @return The options object on which the additional options are set.
-	 */
-	default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-		return currentOptions;
-	}
-
 	// ------------------------------------------------------------------------
 	//  for compatibility
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 8c4d957..c5227a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,9 +26,7 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
-import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -140,38 +138,6 @@ public final class RocksDBResourceContainer implements AutoCloseable {
 		return opt;
 	}
 
-	/**
-	 * Gets the RocksDB {@link WriteOptions} to be used for write operations.
-	 */
-	public WriteOptions getWriteOptions() {
-		// Disable WAL by default
-		WriteOptions opt = new WriteOptions().setDisableWAL(true);
-		handlesToClose.add(opt);
-
-		// add user-defined options factory, if specified
-		if (optionsFactory != null) {
-			opt = optionsFactory.createWriteOptions(opt, handlesToClose);
-		}
-
-		return opt;
-	}
-
-	/**
-	 * Gets the RocksDB {@link ReadOptions} to be used for read operations.
-	 */
-	public ReadOptions getReadOptions() {
-		// We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
-		ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-		handlesToClose.add(opt);
-
-		// add user-defined options factory, if specified
-		if (optionsFactory != null) {
-			opt = optionsFactory.createReadOptions(opt, handlesToClose);
-		}
-
-		return opt;
-	}
-
 	RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
 		return optionsFactory == null
 				? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index eb76e62..73d4bab 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,9 +31,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
-import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -248,16 +246,4 @@ public class RocksDBResourceContainerTest {
 		assertThat(cache.isOwningHandle(), is(false));
 		assertThat(wbm.isOwningHandle(), is(false));
 	}
-
-	@Test
-	public void testFreeWriteReadOptionsAfterClose() throws Exception {
-		RocksDBResourceContainer container = new RocksDBResourceContainer();
-		WriteOptions writeOptions = container.getWriteOptions();
-		ReadOptions readOptions = container.getReadOptions();
-		assertThat(writeOptions.isOwningHandle(), is(true));
-		assertThat(readOptions.isOwningHandle(), is(true));
-		container.close();
-		assertThat(writeOptions.isOwningHandle(), is(false));
-		assertThat(readOptions.isOwningHandle(), is(false));
-	}
 }