You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/26 14:34:02 UTC

[flink] 01/03: [FLINK-17800][rocksdb] Ensure total order seek to avoid user misuse

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3516e37ae0aa4ee040b6844f336541315a455ce9
Author: Yun Tang <my...@live.com>
AuthorDate: Fri Jun 5 01:16:27 2020 +0800

    [FLINK-17800][rocksdb] Ensure total order seek to avoid user misuse
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 ++-
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 ++-
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 +
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +---
 .../state/RocksDBStateMisuseOptionTest.java        | 147 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 +++
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 16 files changed, 255 insertions(+), 46 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index 364185a..fb9a833 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 	@Nonnull
 	private final RocksDB db;
 
+	@Nonnull
+	private final ReadOptions readOptions;
+
 	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
 	@Nonnull
 	private final ColumnFamilyHandle columnFamilyHandle;
@@ -112,6 +116,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnegative int keyGroupId,
 		@Nonnegative int keyGroupPrefixBytes,
 		@Nonnull RocksDB db,
+		@Nonnull ReadOptions readOptions,
 		@Nonnull ColumnFamilyHandle columnFamilyHandle,
 		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
 		@Nonnull DataOutputSerializer outputStream,
@@ -119,6 +124,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
 		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
 		this.db = db;
+		this.readOptions = readOptions;
 		this.columnFamilyHandle = columnFamilyHandle;
 		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
 		this.batchWrapper = batchWrapper;
@@ -304,7 +310,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		flushWriteBatch();
 		return new RocksBytesIterator(
 			new RocksIteratorWrapper(
-				db.newIterator(columnFamilyHandle)));
+				db.newIterator(columnFamilyHandle, readOptions)));
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 1f43dd0..5bce695 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -116,7 +117,8 @@ public class RocksDBIncrementalCheckpointUtils {
 		@Nonnegative long writeBatchSize) throws RocksDBException {
 
 		for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-			try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
+			try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+				RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
 				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
 				iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 61d8688..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -150,6 +151,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final WriteOptions writeOptions;
 
 	/**
+	 * The read options to use when creating iterators.
+	 * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
+	 */
+	private final ReadOptions readOptions;
+
+	/**
 	 * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
 	 */
 	private final long writeBatchSize;
@@ -212,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
+		WriteOptions writeOptions,
+		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -250,7 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = new WriteOptions().setDisableWAL(true);
+		this.writeOptions = writeOptions;
+		this.readOptions = readOptions;
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -290,7 +300,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
 
-		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
+		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
 		iterator.seekToFirst();
 
 		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -360,6 +370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -407,6 +418,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
+	public ReadOptions getReadOptions() {
+		return readOptions;
+	}
+
 	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
 		return sharedRocksKeyBuilder;
 	}
@@ -606,7 +621,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		Snapshot rocksDBSnapshot = db.getSnapshot();
 		try (
-			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
+			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
 			RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
 		) {
 			iterator.seekToFirst();
@@ -681,7 +696,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
 			//TODO maybe filterOrTransform only for k/v states
-			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
+			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
 				rocksIterator.seekToFirst();
 
 				while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d1a0184..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
 		// The write options to use in the states.
 		WriteOptions writeOptions = null;
+		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -282,6 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			}
 
 			writeOptions = new WriteOptions().setDisableWAL(true);
+			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -294,8 +297,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
 				kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
 			// init priority queue factory
-			priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
-				writeBatchWrapper, nativeMetricMonitor);
+			priorityQueueFactory = initPriorityQueueFactory(
+				keyGroupPrefixBytes,
+				kvStateInformation,
+				db,
+				readOptions,
+				writeBatchWrapper,
+				nativeMetricMonitor);
 		} catch (Throwable e) {
 			// Do clean up
 			List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -313,6 +321,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
@@ -344,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
+			writeOptions,
+			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -472,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -486,6 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
+					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index f0b47cd..6c7f541 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
 	public boolean isEmpty() {
 		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
-		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
 
 			iterator.seek(prefixBytes);
 
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void clear() {
 		try {
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
 				RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
 				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
 			// occurred in the below code block.
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
 
 				/*
 				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1455c1b..0f564d5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,6 +32,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -94,12 +95,18 @@ public class RocksDBOperationUtils {
 		return dbRef;
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
-		return new RocksIteratorWrapper(db.newIterator());
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
-		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+	/**
+	 * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
+	 *
+	 * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
+	 */
+	// TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
+	public static ReadOptions createTotalOrderSeekReadOptions() {
+		return new ReadOptions().setTotalOrderSeek(true);
 	}
 
 	public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 1cbeebe..59daff8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -36,6 +37,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	/**
 	 * Default cache size per key-group.
 	 */
-	private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+	@VisibleForTesting
+	static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
 
 	/**
 	 * A shared buffer to serialize elements for the priority queue.
@@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	private final int numberOfKeyGroups;
 	private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
 	private final RocksDB db;
+	private final ReadOptions readOptions;
 	private final RocksDBWriteBatchWrapper writeBatchWrapper;
 	private final RocksDBNativeMetricMonitor nativeMetricMonitor;
 	private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		int numberOfKeyGroups,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.kvStateInformation = kvStateInformation;
 		this.db = db;
+		this.readOptions = readOptions;
 		this.writeBatchWrapper = writeBatchWrapper;
 		this.nativeMetricMonitor = nativeMetricMonitor;
 		this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -122,6 +128,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 						keyGroupId,
 						keyGroupPrefixBytes,
 						db,
+						readOptions,
 						columnFamilyHandle,
 						byteOrderedElementSerializer,
 						sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f7abed6..53c7537 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,6 +51,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -309,7 +310,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
 						.columnFamilyHandle;
 
-					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
+					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
 
 						iterator.seek(startKeyGroupPrefixBytes);
 
@@ -376,6 +377,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 		@Nonnull
 		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
+		private final ReadOptions readOptions;
+
 		private RestoredDBInstance(
 			@Nonnull RocksDB db,
 			@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -386,6 +389,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			this.columnFamilyHandles = columnFamilyHandles;
 			this.columnFamilyDescriptors = columnFamilyDescriptors;
 			this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+			this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		}
 
 		@Override
@@ -397,6 +401,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			IOUtils.closeAllQuietly(columnFamilyHandles);
 			IOUtils.closeQuietly(db);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
+			IOUtils.closeQuietly(readOptions);
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index d402c3d..fd68d70 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,6 +60,7 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
 				keyGroupId,
 				keyGroupPrefixBytes,
 				rocksDBResource.getRocksDB(),
+				rocksDBResource.getReadOptions(),
 				rocksDBResource.getDefaultColumnFamily(),
 				TestElementSerializer.INSTANCE,
 				outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index d25baa7..3b3b697 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
 					LOG.error("Close previous ColumnOptions's instance failed.", e);
 				}
 
-				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
+				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
 			}
 		});
 	}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
 			PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
 		this.writeOptions = new WriteOptions();
 		this.writeOptions.disableWAL();
-		this.readOptions = new ReadOptions();
+		this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		this.columnFamilyHandles = new ArrayList<>(1);
 		this.rocksDB = RocksDB.open(
 			dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 8f71b65..1a4808f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
 			ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 
 			try (
-				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
+				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
 				RocksStateKeysIterator<K> iteratorWrapper =
 					new RocksStateKeysIterator<>(
 						iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 0cc84db..e64fc79 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -123,7 +124,7 @@ public class RocksDBStateBackendConfigTest {
 		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -160,7 +161,7 @@ public class RocksDBStateBackendConfigTest {
 
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 		Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
 		keyedBackend.dispose();
 
@@ -168,7 +169,7 @@ public class RocksDBStateBackendConfigTest {
 		conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
 		rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
-		keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 		Assert.assertEquals(
 			HeapPriorityQueueSetFactory.class,
 			keyedBackend.getPriorityQueueFactory().getClass());
@@ -197,7 +198,7 @@ public class RocksDBStateBackendConfigTest {
 		final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
 			configFromConfFile,
 			Thread.currentThread().getContextClassLoader());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
 
 		// priorityQueueStateType of the job backend should be preserved
 		assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -254,7 +255,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath(configuredPath);
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -724,23 +725,6 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
-			RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
-
-		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
-			env,
-			env.getJobID(),
-			"test_op",
-			IntSerializer.INSTANCE,
-			1,
-			new KeyGroupRange(0, 0),
-			env.getTaskKvStateRegistry(),
-			TtlTimeProvider.DEFAULT,
-			new UnregisteredMetricsGroup(),
-			Collections.emptyList(),
-			new CloseableRegistry());
-	}
-
 	static MockEnvironment getMockEnvironment(File... tempDirs) {
 		final String[] tempDirStrings = new String[tempDirs.length];
 		for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
new file mode 100644
index 0000000..59a4822
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
+ *
+ * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
+ * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
+ * could not get expected result, e.g. FLINK-17800
+ */
+public class RocksDBStateMisuseOptionTest {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	/**
+	 * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
+	 *
+	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+	 */
+	@Test
+	public void testMisuseOptimizePointLookupWithMapState() throws Exception {
+		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+		MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+		keyedStateBackend.setCurrentKey(1);
+		Map<Integer, Long> expectedResult = new HashMap<>();
+		for (int i = 0; i < 100; i++) {
+			long uv = ThreadLocalRandom.current().nextLong();
+			mapState.put(i, uv);
+			expectedResult.put(i, uv);
+		}
+
+		Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
+		while (iterator.hasNext()) {
+			Map.Entry<Integer, Long> entry = iterator.next();
+			assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
+			iterator.remove();
+		}
+		assertTrue(expectedResult.isEmpty());
+		assertTrue(mapState.isEmpty());
+	}
+
+	/**
+	 * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
+	 *
+	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+	 */
+	@Test
+	public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
+		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
+			keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
+
+		PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
+		// ensure we insert timers more than cache capacity.
+		int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+		List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
+		Collections.shuffle(timeStamps);
+		for (Integer timeStamp : timeStamps) {
+			TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+			priorityQueue.add(timer);
+			expectedPriorityQueue.add(timer);
+		}
+		assertEquals(queueSize, priorityQueue.size());
+		TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+		while ((timer = priorityQueue.poll()) != null) {
+			assertEquals(expectedPriorityQueue.poll(), timer);
+		}
+		assertTrue(expectedPriorityQueue.isEmpty());
+		assertTrue(priorityQueue.isEmpty());
+
+	}
+
+	private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
+		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
+		rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+		rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+				return currentOptions;
+			}
+
+			@Override
+			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+				return currentOptions.optimizeForPointLookup(64);
+			}
+		});
+		return rocksDBStateBackend;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index ed44a73..cee56aa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -35,6 +36,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -98,4 +100,23 @@ public final class RocksDBTestUtils {
 				defaultCFHandle,
 				new CloseableRegistry());
 	}
+
+	public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend,
+			Environment env,
+			TypeSerializer<K> keySerializer) throws IOException {
+
+		return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
+			env,
+			env.getJobID(),
+			"test_op",
+			keySerializer,
+			1,
+			new KeyGroupRange(0, 0),
+			env.getTaskKvStateRegistry(),
+			TtlTimeProvider.DEFAULT,
+			new UnregisteredMetricsGroup(),
+			Collections.emptyList(),
+			new CloseableRegistry());
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 4447f08..2c73fc7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -74,7 +75,8 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 	public void testMergeIterator(int maxParallelism) throws Exception {
 		Random random = new Random(1234);
 
-		try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+		try (ReadOptions readOptions = new ReadOptions();
+			RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
 			List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -108,7 +110,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 			int id = 0;
 			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
 				++id;
 			}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 96003d6..ce06d1f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -167,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
 			int pos = 0;
 
-			try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
+			try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
 				// seek to start
 				unsafe.putInt(keyTemplate, offset, 0);
 				iterator.seek(keyTemplate);