You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/06/17 12:33:09 UTC

[flink] branch release-1.11 updated (d94219d -> cdac5e3)

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d94219d  [FLINK-18330][python][legal] Update the NOTICE file of flink-python module adding beam-runners-core-java and beam-vendor-bytebuddy
     new 6a5213e  [FLINK-16795][e2e] Increase e2e execution timeout +20m
     new 6e367fb  Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"
     new cdac5e3  Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  21 +--
 .../state/RocksDBKeyedStateBackendBuilder.java     |  16 +--
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 +--
 .../streaming/state/RocksDBOptionsFactory.java     |  34 -----
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../streaming/state/RocksDBResourceContainer.java  |  34 -----
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 -
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBResourceContainerTest.java        |  14 --
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +++-
 .../state/RocksDBStateMisuseOptionTest.java        | 147 ---------------------
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 ---
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 tools/azure-pipelines/jobs-template.yml            |   2 +-
 20 files changed, 54 insertions(+), 328 deletions(-)
 delete mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java


[flink] 03/03: Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdac5e32eb2d3348d711207c96c91f97a17aa2d9
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 10:13:30 2020 +0200

    Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"
    
    This reverts commit 8ca388ca0225ff22f532c8a65f97d8cfea027c22.
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 +--
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 +--
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 -
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +++-
 .../state/RocksDBStateMisuseOptionTest.java        | 147 ---------------------
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 ---
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 16 files changed, 46 insertions(+), 255 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index fb9a833..364185a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -64,9 +63,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 	@Nonnull
 	private final RocksDB db;
 
-	@Nonnull
-	private final ReadOptions readOptions;
-
 	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
 	@Nonnull
 	private final ColumnFamilyHandle columnFamilyHandle;
@@ -116,7 +112,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnegative int keyGroupId,
 		@Nonnegative int keyGroupPrefixBytes,
 		@Nonnull RocksDB db,
-		@Nonnull ReadOptions readOptions,
 		@Nonnull ColumnFamilyHandle columnFamilyHandle,
 		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
 		@Nonnull DataOutputSerializer outputStream,
@@ -124,7 +119,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
 		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
 		this.db = db;
-		this.readOptions = readOptions;
 		this.columnFamilyHandle = columnFamilyHandle;
 		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
 		this.batchWrapper = batchWrapper;
@@ -310,7 +304,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		flushWriteBatch();
 		return new RocksBytesIterator(
 			new RocksIteratorWrapper(
-				db.newIterator(columnFamilyHandle, readOptions)));
+				db.newIterator(columnFamilyHandle)));
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5bce695..1f43dd0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,7 +21,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -117,8 +116,7 @@ public class RocksDBIncrementalCheckpointUtils {
 		@Nonnegative long writeBatchSize) throws RocksDBException {
 
 		for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-			try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-				RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
+			try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
 				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
 				iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f0cce0b..61d8688 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,7 +66,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -151,12 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final WriteOptions writeOptions;
 
 	/**
-	 * The read options to use when creating iterators.
-	 * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
-	 */
-	private final ReadOptions readOptions;
-
-	/**
 	 * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
 	 */
 	private final long writeBatchSize;
@@ -219,8 +212,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
-		WriteOptions writeOptions,
-		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -259,8 +250,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = writeOptions;
-		this.readOptions = readOptions;
+		this.writeOptions = new WriteOptions().setDisableWAL(true);
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -300,7 +290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
 
-		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
+		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
 		iterator.seekToFirst();
 
 		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -370,7 +360,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -418,10 +407,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
-	public ReadOptions getReadOptions() {
-		return readOptions;
-	}
-
 	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
 		return sharedRocksKeyBuilder;
 	}
@@ -621,7 +606,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		Snapshot rocksDBSnapshot = db.getSnapshot();
 		try (
-			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
+			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
 			RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
 		) {
 			iterator.seekToFirst();
@@ -696,7 +681,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
 			//TODO maybe filterOrTransform only for k/v states
-			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
+			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
 				rocksIterator.seekToFirst();
 
 				while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index b9bdbb8..d1a0184 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,6 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -245,7 +244,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
 		// The write options to use in the states.
 		WriteOptions writeOptions = null;
-		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -284,7 +282,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			}
 
 			writeOptions = new WriteOptions().setDisableWAL(true);
-			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -297,13 +294,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
 				kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
 			// init priority queue factory
-			priorityQueueFactory = initPriorityQueueFactory(
-				keyGroupPrefixBytes,
-				kvStateInformation,
-				db,
-				readOptions,
-				writeBatchWrapper,
-				nativeMetricMonitor);
+			priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
+				writeBatchWrapper, nativeMetricMonitor);
 		} catch (Throwable e) {
 			// Do clean up
 			List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -321,7 +313,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
@@ -353,8 +344,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
-			writeOptions,
-			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -483,7 +472,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -498,7 +486,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 6c7f541..f0b47cd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
 	public boolean isEmpty() {
 		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
-		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
+		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
 
 			iterator.seek(prefixBytes);
 
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void clear() {
 		try {
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
 				RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
 				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
 			// occurred in the below code block.
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
 
 				/*
 				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 0f564d5..1455c1b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,7 +32,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -95,18 +94,12 @@ public class RocksDBOperationUtils {
 		return dbRef;
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
-		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+		return new RocksIteratorWrapper(db.newIterator());
 	}
 
-	/**
-	 * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
-	 *
-	 * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
-	 */
-	// TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
-	public static ReadOptions createTotalOrderSeekReadOptions() {
-		return new ReadOptions().setTotalOrderSeek(true);
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
 	}
 
 	public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 59daff8..1cbeebe 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,7 +36,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -54,8 +52,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	/**
 	 * Default cache size per key-group.
 	 */
-	@VisibleForTesting
-	static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+	private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
 
 	/**
 	 * A shared buffer to serialize elements for the priority queue.
@@ -74,7 +71,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	private final int numberOfKeyGroups;
 	private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
 	private final RocksDB db;
-	private final ReadOptions readOptions;
 	private final RocksDBWriteBatchWrapper writeBatchWrapper;
 	private final RocksDBNativeMetricMonitor nativeMetricMonitor;
 	private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -85,7 +81,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		int numberOfKeyGroups,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -94,7 +89,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.kvStateInformation = kvStateInformation;
 		this.db = db;
-		this.readOptions = readOptions;
 		this.writeBatchWrapper = writeBatchWrapper;
 		this.nativeMetricMonitor = nativeMetricMonitor;
 		this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -128,7 +122,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 						keyGroupId,
 						keyGroupPrefixBytes,
 						db,
-						readOptions,
 						columnFamilyHandle,
 						byteOrderedElementSerializer,
 						sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 53c7537..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,7 +51,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -310,7 +309,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
 						.columnFamilyHandle;
 
-					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
+					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
 
 						iterator.seek(startKeyGroupPrefixBytes);
 
@@ -377,8 +376,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 		@Nonnull
 		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
-		private final ReadOptions readOptions;
-
 		private RestoredDBInstance(
 			@Nonnull RocksDB db,
 			@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -389,7 +386,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			this.columnFamilyHandles = columnFamilyHandles;
 			this.columnFamilyDescriptors = columnFamilyDescriptors;
 			this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-			this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		}
 
 		@Override
@@ -401,7 +397,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			IOUtils.closeAllQuietly(columnFamilyHandles);
 			IOUtils.closeQuietly(db);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
-			IOUtils.closeQuietly(readOptions);
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index fd68d70..d402c3d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,7 +60,6 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
 				keyGroupId,
 				keyGroupPrefixBytes,
 				rocksDBResource.getRocksDB(),
-				rocksDBResource.getReadOptions(),
 				rocksDBResource.getDefaultColumnFamily(),
 				TestElementSerializer.INSTANCE,
 				outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 3b3b697..d25baa7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
 					LOG.error("Close previous ColumnOptions's instance failed.", e);
 				}
 
-				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
+				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
 			}
 		});
 	}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
 			PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
 		this.writeOptions = new WriteOptions();
 		this.writeOptions.disableWAL();
-		this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+		this.readOptions = new ReadOptions();
 		this.columnFamilyHandles = new ArrayList<>(1);
 		this.rocksDB = RocksDB.open(
 			dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 1a4808f..8f71b65 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
 			ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 
 			try (
-				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
+				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
 				RocksStateKeysIterator<K> iteratorWrapper =
 					new RocksStateKeysIterator<>(
 						iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 197f7ca..d3eb6db 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,7 +61,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -124,7 +123,7 @@ public class RocksDBStateBackendConfigTest {
 		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -161,7 +160,7 @@ public class RocksDBStateBackendConfigTest {
 
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 		Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
 		keyedBackend.dispose();
 
@@ -169,7 +168,7 @@ public class RocksDBStateBackendConfigTest {
 		conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
 		rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
-		keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 		Assert.assertEquals(
 			HeapPriorityQueueSetFactory.class,
 			keyedBackend.getPriorityQueueFactory().getClass());
@@ -198,7 +197,7 @@ public class RocksDBStateBackendConfigTest {
 		final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
 			configFromConfFile,
 			Thread.currentThread().getContextClassLoader());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
 
 		// priorityQueueStateType of the job backend should be preserved
 		assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -255,7 +254,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath(configuredPath);
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -726,6 +725,23 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
+
+		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
+			env,
+			env.getJobID(),
+			"test_op",
+			IntSerializer.INSTANCE,
+			1,
+			new KeyGroupRange(0, 0),
+			env.getTaskKvStateRegistry(),
+			TtlTimeProvider.DEFAULT,
+			new UnregisteredMetricsGroup(),
+			Collections.emptyList(),
+			new CloseableRegistry());
+	}
+
 	static MockEnvironment getMockEnvironment(File... tempDirs) {
 		final String[] tempDirStrings = new String[tempDirs.length];
 		for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
deleted file mode 100644
index 59a4822..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
- *
- * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
- * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
- * could not get expected result, e.g. FLINK-17800
- */
-public class RocksDBStateMisuseOptionTest {
-
-	@Rule
-	public final TemporaryFolder tempFolder = new TemporaryFolder();
-
-	/**
-	 * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
-	 *
-	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
-	 */
-	@Test
-	public void testMisuseOptimizePointLookupWithMapState() throws Exception {
-		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
-		MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
-		keyedStateBackend.setCurrentKey(1);
-		Map<Integer, Long> expectedResult = new HashMap<>();
-		for (int i = 0; i < 100; i++) {
-			long uv = ThreadLocalRandom.current().nextLong();
-			mapState.put(i, uv);
-			expectedResult.put(i, uv);
-		}
-
-		Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
-		while (iterator.hasNext()) {
-			Map.Entry<Integer, Long> entry = iterator.next();
-			assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
-			iterator.remove();
-		}
-		assertTrue(expectedResult.isEmpty());
-		assertTrue(mapState.isEmpty());
-	}
-
-	/**
-	 * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
-	 *
-	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
-	 */
-	@Test
-	public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
-		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
-			keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
-
-		PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
-		// ensure we insert timers more than cache capacity.
-		int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
-		List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
-		Collections.shuffle(timeStamps);
-		for (Integer timeStamp : timeStamps) {
-			TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
-			priorityQueue.add(timer);
-			expectedPriorityQueue.add(timer);
-		}
-		assertEquals(queueSize, priorityQueue.size());
-		TimerHeapInternalTimer<Integer, VoidNamespace> timer;
-		while ((timer = priorityQueue.poll()) != null) {
-			assertEquals(expectedPriorityQueue.poll(), timer);
-		}
-		assertTrue(expectedPriorityQueue.isEmpty());
-		assertTrue(priorityQueue.isEmpty());
-
-	}
-
-	private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
-		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
-		rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
-		rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-				return currentOptions;
-			}
-
-			@Override
-			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-				return currentOptions.optimizeForPointLookup(64);
-			}
-		});
-		return rocksDBStateBackend;
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index cee56aa..ed44a73 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -36,7 +35,6 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -100,23 +98,4 @@ public final class RocksDBTestUtils {
 				defaultCFHandle,
 				new CloseableRegistry());
 	}
-
-	public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
-			RocksDBStateBackend rocksDbBackend,
-			Environment env,
-			TypeSerializer<K> keySerializer) throws IOException {
-
-		return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
-			env,
-			env.getJobID(),
-			"test_op",
-			keySerializer,
-			1,
-			new KeyGroupRange(0, 0),
-			env.getTaskKvStateRegistry(),
-			TtlTimeProvider.DEFAULT,
-			new UnregisteredMetricsGroup(),
-			Collections.emptyList(),
-			new CloseableRegistry());
-	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 2c73fc7..4447f08 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -75,8 +74,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 	public void testMergeIterator(int maxParallelism) throws Exception {
 		Random random = new Random(1234);
 
-		try (ReadOptions readOptions = new ReadOptions();
-			RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+		try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
 			List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -110,7 +108,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 			int id = 0;
 			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
 				++id;
 			}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index ce06d1f..96003d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -166,7 +167,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
 			int pos = 0;
 
-			try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
+			try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
 				// seek to start
 				unsafe.putInt(keyTemplate, offset, 0);
 				iterator.seek(keyTemplate);


[flink] 02/03: Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6e367fb06a3f02360bd8e9216dd477d3e3b68186
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 10:13:19 2020 +0200

    Revert "[FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them"
    
    This reverts commit f1250625b2ade530fa2619d6e1bb734832748d31.
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 +++--
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 +++++++++--
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ----------------------
 .../streaming/state/RocksDBResourceContainer.java  | 34 ----------------------
 .../state/RocksDBResourceContainerTest.java        | 14 ---------
 5 files changed, 21 insertions(+), 86 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 24b897a..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
+		WriteOptions writeOptions,
+		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -257,8 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = optionsContainer.getWriteOptions();
-		this.readOptions = optionsContainer.getReadOptions();
+		this.writeOptions = writeOptions;
+		this.readOptions = readOptions;
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -368,6 +370,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
+			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 1846bde..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,9 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -241,6 +243,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		ColumnFamilyHandle defaultColumnFamilyHandle = null;
 		RocksDBNativeMetricMonitor nativeMetricMonitor = null;
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
+		// The write options to use in the states.
+		WriteOptions writeOptions = null;
+		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -278,7 +283,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				}
 			}
 
-			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
+			writeOptions = new WriteOptions().setDisableWAL(true);
+			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
 			// only now we can be certain that the key serializer used in the builder is final.
@@ -294,6 +301,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				keyGroupPrefixBytes,
 				kvStateInformation,
 				db,
+				readOptions,
 				writeBatchWrapper,
 				nativeMetricMonitor);
 		} catch (Throwable e) {
@@ -313,6 +321,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
+			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
 			try {
@@ -343,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
+			writeOptions,
+			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -471,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -485,7 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					optionsContainer.getReadOptions(),
+					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index 5ee5098..a5fd1e9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,8 +20,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.WriteOptions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,38 +93,6 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
 		return nativeMetricOptions;
 	}
 
-	/**
-	 * This method should set the additional options on top of the current options object.
-	 * The current options object may contain pre-defined options based on flags that have
-	 * been configured on the state backend.
-	 *
-	 * <p>It is important to set the options on the current object and return the result from
-	 * the setter methods, otherwise the pre-defined options may get lost.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
-	 * @return The options object on which the additional options are set.
-	 */
-	default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-		return currentOptions;
-	}
-
-	/**
-	 * This method should set the additional options on top of the current options object.
-	 * The current options object may contain pre-defined options based on flags that have
-	 * been configured on the state backend.
-	 *
-	 * <p>It is important to set the options on the current object and return the result from
-	 * the setter methods, otherwise the pre-defined options may get lost.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
-	 * @return The options object on which the additional options are set.
-	 */
-	default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-		return currentOptions;
-	}
-
 	// ------------------------------------------------------------------------
 	//  for compatibility
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 8c4d957..c5227a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,9 +26,7 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
-import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -140,38 +138,6 @@ public final class RocksDBResourceContainer implements AutoCloseable {
 		return opt;
 	}
 
-	/**
-	 * Gets the RocksDB {@link WriteOptions} to be used for write operations.
-	 */
-	public WriteOptions getWriteOptions() {
-		// Disable WAL by default
-		WriteOptions opt = new WriteOptions().setDisableWAL(true);
-		handlesToClose.add(opt);
-
-		// add user-defined options factory, if specified
-		if (optionsFactory != null) {
-			opt = optionsFactory.createWriteOptions(opt, handlesToClose);
-		}
-
-		return opt;
-	}
-
-	/**
-	 * Gets the RocksDB {@link ReadOptions} to be used for read operations.
-	 */
-	public ReadOptions getReadOptions() {
-		// We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
-		ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-		handlesToClose.add(opt);
-
-		// add user-defined options factory, if specified
-		if (optionsFactory != null) {
-			opt = optionsFactory.createReadOptions(opt, handlesToClose);
-		}
-
-		return opt;
-	}
-
 	RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
 		return optionsFactory == null
 				? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index eb76e62..73d4bab 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,9 +31,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
-import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -248,16 +246,4 @@ public class RocksDBResourceContainerTest {
 		assertThat(cache.isOwningHandle(), is(false));
 		assertThat(wbm.isOwningHandle(), is(false));
 	}
-
-	@Test
-	public void testFreeWriteReadOptionsAfterClose() throws Exception {
-		RocksDBResourceContainer container = new RocksDBResourceContainer();
-		WriteOptions writeOptions = container.getWriteOptions();
-		ReadOptions readOptions = container.getReadOptions();
-		assertThat(writeOptions.isOwningHandle(), is(true));
-		assertThat(readOptions.isOwningHandle(), is(true));
-		container.close();
-		assertThat(writeOptions.isOwningHandle(), is(false));
-		assertThat(readOptions.isOwningHandle(), is(false));
-	}
 }


[flink] 01/03: [FLINK-16795][e2e] Increase e2e execution timeout +20m

Posted by rm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a5213ee605297687ec4b8a2f872df05cd4921a3
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 13:34:23 2020 +0200

    [FLINK-16795][e2e] Increase e2e execution timeout +20m
---
 tools/azure-pipelines/jobs-template.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tools/azure-pipelines/jobs-template.yml b/tools/azure-pipelines/jobs-template.yml
index 0efb1a2..2930f31 100644
--- a/tools/azure-pipelines/jobs-template.yml
+++ b/tools/azure-pipelines/jobs-template.yml
@@ -152,7 +152,7 @@ jobs:
   #condition: or(eq(variables['MODE'], 'e2e'), eq(${{parameters.run_end_to_end}}, 'true'))
   # We are running this in a separate pool
   pool: ${{parameters.e2e_pool_definition}}
-  timeoutInMinutes: 240
+  timeoutInMinutes: 260
   cancelTimeoutInMinutes: 1
   workspace:
     clean: all