You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/16 08:31:23 UTC

[flink] branch release-1.11 updated (fbae8fc -> 8f31729)

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fbae8fc  [FLINK-17666][table-planner-blink] Insert into partitioned table can fail with select *
     new 5c0de8d  [FLINK-17800][roksdb] Ensure total order seek to avoid user misuse
     new 8f31729  [FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  21 ++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  16 +--
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 ++-
 .../streaming/state/RocksDBOptionsFactory.java     |  34 +++++
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../streaming/state/RocksDBResourceContainer.java  |  34 +++++
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 +
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBResourceContainerTest.java        |  14 ++
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +---
 .../state/RocksDBStateMisuseOptionTest.java        | 147 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 +++
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 19 files changed, 327 insertions(+), 53 deletions(-)
 create mode 100644 flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java


[flink] 02/02: [FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f3172962c5033d4a7dbf5232c18d3e872821c02
Author: Yu Li <li...@apache.org>
AuthorDate: Tue Jun 16 14:01:29 2020 +0800

    [FLINK-17800][roksdb] Support customized RocksDB write/read options and use RocksDBResourceContainer to get them
---
 .../streaming/state/RocksDBKeyedStateBackend.java  |  8 ++---
 .../state/RocksDBKeyedStateBackendBuilder.java     | 17 ++---------
 .../streaming/state/RocksDBOptionsFactory.java     | 34 ++++++++++++++++++++++
 .../streaming/state/RocksDBResourceContainer.java  | 34 ++++++++++++++++++++++
 .../state/RocksDBResourceContainerTest.java        | 14 +++++++++
 5 files changed, 86 insertions(+), 21 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f0cce0b..24b897a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -219,8 +219,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
-		WriteOptions writeOptions,
-		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -259,8 +257,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = writeOptions;
-		this.readOptions = readOptions;
+		this.writeOptions = optionsContainer.getWriteOptions();
+		this.readOptions = optionsContainer.getReadOptions();
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -370,8 +368,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
-			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index b9bdbb8..1846bde 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,9 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
-import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,9 +241,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		ColumnFamilyHandle defaultColumnFamilyHandle = null;
 		RocksDBNativeMetricMonitor nativeMetricMonitor = null;
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
-		// The write options to use in the states.
-		WriteOptions writeOptions = null;
-		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -283,9 +278,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				}
 			}
 
-			writeOptions = new WriteOptions().setDisableWAL(true);
-			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
+			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, optionsContainer.getWriteOptions(), writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
 			// only now we can be certain that the key serializer used in the builder is final.
@@ -301,7 +294,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 				keyGroupPrefixBytes,
 				kvStateInformation,
 				db,
-				readOptions,
 				writeBatchWrapper,
 				nativeMetricMonitor);
 		} catch (Throwable e) {
@@ -321,8 +313,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
-			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
 			try {
@@ -353,8 +343,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
-			writeOptions,
-			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -483,7 +471,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -498,7 +485,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					readOptions,
+					optionsContainer.getReadOptions(),
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
index a5fd1e9..5ee5098 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.WriteOptions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -93,6 +95,38 @@ public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializa
 		return nativeMetricOptions;
 	}
 
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return currentOptions;
+	}
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return currentOptions;
+	}
+
 	// ------------------------------------------------------------------------
 	//  for compatibility
 	// ------------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index c5227a5..8c4d957 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -26,7 +26,9 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
 
 import javax.annotation.Nullable;
 
@@ -138,6 +140,38 @@ public final class RocksDBResourceContainer implements AutoCloseable {
 		return opt;
 	}
 
+	/**
+	 * Gets the RocksDB {@link WriteOptions} to be used for write operations.
+	 */
+	public WriteOptions getWriteOptions() {
+		// Disable WAL by default
+		WriteOptions opt = new WriteOptions().setDisableWAL(true);
+		handlesToClose.add(opt);
+
+		// add user-defined options factory, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createWriteOptions(opt, handlesToClose);
+		}
+
+		return opt;
+	}
+
+	/**
+	 * Gets the RocksDB {@link ReadOptions} to be used for read operations.
+	 */
+	public ReadOptions getReadOptions() {
+		// We ensure total order seek by default to prevent user misuse, see FLINK-17800 for more details
+		ReadOptions opt = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+		handlesToClose.add(opt);
+
+		// add user-defined options factory, if specified
+		if (optionsFactory != null) {
+			opt = optionsFactory.createReadOptions(opt, handlesToClose);
+		}
+
+		return opt;
+	}
+
 	RocksDBNativeMetricOptions getMemoryWatcherOptions(RocksDBNativeMetricOptions defaultMetricOptions) {
 		return optionsFactory == null
 				? defaultMetricOptions
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 73d4bab..eb76e62 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -31,7 +31,9 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.WriteBufferManager;
+import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -246,4 +248,16 @@ public class RocksDBResourceContainerTest {
 		assertThat(cache.isOwningHandle(), is(false));
 		assertThat(wbm.isOwningHandle(), is(false));
 	}
+
+	@Test
+	public void testFreeWriteReadOptionsAfterClose() throws Exception {
+		RocksDBResourceContainer container = new RocksDBResourceContainer();
+		WriteOptions writeOptions = container.getWriteOptions();
+		ReadOptions readOptions = container.getReadOptions();
+		assertThat(writeOptions.isOwningHandle(), is(true));
+		assertThat(readOptions.isOwningHandle(), is(true));
+		container.close();
+		assertThat(writeOptions.isOwningHandle(), is(false));
+		assertThat(readOptions.isOwningHandle(), is(false));
+	}
 }


[flink] 01/02: [FLINK-17800][roksdb] Ensure total order seek to avoid user misuse

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c0de8d6d5a9eea1a779cf3703412f522bece54c
Author: Yun Tang <my...@live.com>
AuthorDate: Fri Jun 5 01:16:27 2020 +0800

    [FLINK-17800][roksdb] Ensure total order seek to avoid user misuse
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +++-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 ++-
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 ++-
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 +
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +---
 .../state/RocksDBStateMisuseOptionTest.java        | 147 +++++++++++++++++++++
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 +++
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 16 files changed, 255 insertions(+), 46 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index 364185a..fb9a833 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 	@Nonnull
 	private final RocksDB db;
 
+	@Nonnull
+	private final ReadOptions readOptions;
+
 	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
 	@Nonnull
 	private final ColumnFamilyHandle columnFamilyHandle;
@@ -112,6 +116,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnegative int keyGroupId,
 		@Nonnegative int keyGroupPrefixBytes,
 		@Nonnull RocksDB db,
+		@Nonnull ReadOptions readOptions,
 		@Nonnull ColumnFamilyHandle columnFamilyHandle,
 		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
 		@Nonnull DataOutputSerializer outputStream,
@@ -119,6 +124,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
 		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
 		this.db = db;
+		this.readOptions = readOptions;
 		this.columnFamilyHandle = columnFamilyHandle;
 		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
 		this.batchWrapper = batchWrapper;
@@ -304,7 +310,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		flushWriteBatch();
 		return new RocksBytesIterator(
 			new RocksIteratorWrapper(
-				db.newIterator(columnFamilyHandle)));
+				db.newIterator(columnFamilyHandle, readOptions)));
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 1f43dd0..5bce695 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -116,7 +117,8 @@ public class RocksDBIncrementalCheckpointUtils {
 		@Nonnegative long writeBatchSize) throws RocksDBException {
 
 		for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-			try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
+			try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+				RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
 				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
 				iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 61d8688..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -150,6 +151,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final WriteOptions writeOptions;
 
 	/**
+	 * The read options to use when creating iterators.
+	 * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
+	 */
+	private final ReadOptions readOptions;
+
+	/**
 	 * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
 	 */
 	private final long writeBatchSize;
@@ -212,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
+		WriteOptions writeOptions,
+		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -250,7 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = new WriteOptions().setDisableWAL(true);
+		this.writeOptions = writeOptions;
+		this.readOptions = readOptions;
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -290,7 +300,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
 
-		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
+		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
 		iterator.seekToFirst();
 
 		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -360,6 +370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -407,6 +418,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
+	public ReadOptions getReadOptions() {
+		return readOptions;
+	}
+
 	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
 		return sharedRocksKeyBuilder;
 	}
@@ -606,7 +621,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		Snapshot rocksDBSnapshot = db.getSnapshot();
 		try (
-			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
+			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
 			RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
 		) {
 			iterator.seekToFirst();
@@ -681,7 +696,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
 			//TODO maybe filterOrTransform only for k/v states
-			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
+			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
 				rocksIterator.seekToFirst();
 
 				while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d1a0184..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
 		// The write options to use in the states.
 		WriteOptions writeOptions = null;
+		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -282,6 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			}
 
 			writeOptions = new WriteOptions().setDisableWAL(true);
+			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -294,8 +297,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
 				kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
 			// init priority queue factory
-			priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
-				writeBatchWrapper, nativeMetricMonitor);
+			priorityQueueFactory = initPriorityQueueFactory(
+				keyGroupPrefixBytes,
+				kvStateInformation,
+				db,
+				readOptions,
+				writeBatchWrapper,
+				nativeMetricMonitor);
 		} catch (Throwable e) {
 			// Do clean up
 			List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -313,6 +321,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
+			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
@@ -344,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
+			writeOptions,
+			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -472,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -486,6 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
+					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index f0b47cd..6c7f541 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
 	public boolean isEmpty() {
 		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
-		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
 
 			iterator.seek(prefixBytes);
 
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void clear() {
 		try {
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
 				RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
 				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
 			// occurred in the below code block.
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
 
 				/*
 				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1455c1b..0f564d5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,6 +32,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -94,12 +95,18 @@ public class RocksDBOperationUtils {
 		return dbRef;
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
-		return new RocksIteratorWrapper(db.newIterator());
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
-		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+	/**
+	 * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
+	 *
+	 * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
+	 */
+	// TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
+	public static ReadOptions createTotalOrderSeekReadOptions() {
+		return new ReadOptions().setTotalOrderSeek(true);
 	}
 
 	public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 1cbeebe..59daff8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -36,6 +37,7 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	/**
 	 * Default cache size per key-group.
 	 */
-	private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+	@VisibleForTesting
+	static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
 
 	/**
 	 * A shared buffer to serialize elements for the priority queue.
@@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	private final int numberOfKeyGroups;
 	private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
 	private final RocksDB db;
+	private final ReadOptions readOptions;
 	private final RocksDBWriteBatchWrapper writeBatchWrapper;
 	private final RocksDBNativeMetricMonitor nativeMetricMonitor;
 	private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		int numberOfKeyGroups,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
+		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.kvStateInformation = kvStateInformation;
 		this.db = db;
+		this.readOptions = readOptions;
 		this.writeBatchWrapper = writeBatchWrapper;
 		this.nativeMetricMonitor = nativeMetricMonitor;
 		this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -122,6 +128,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 						keyGroupId,
 						keyGroupPrefixBytes,
 						db,
+						readOptions,
 						columnFamilyHandle,
 						byteOrderedElementSerializer,
 						sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f7abed6..53c7537 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,6 +51,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -309,7 +310,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
 						.columnFamilyHandle;
 
-					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
+					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
 
 						iterator.seek(startKeyGroupPrefixBytes);
 
@@ -376,6 +377,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 		@Nonnull
 		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
+		private final ReadOptions readOptions;
+
 		private RestoredDBInstance(
 			@Nonnull RocksDB db,
 			@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -386,6 +389,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			this.columnFamilyHandles = columnFamilyHandles;
 			this.columnFamilyDescriptors = columnFamilyDescriptors;
 			this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+			this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		}
 
 		@Override
@@ -397,6 +401,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			IOUtils.closeAllQuietly(columnFamilyHandles);
 			IOUtils.closeQuietly(db);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
+			IOUtils.closeQuietly(readOptions);
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index d402c3d..fd68d70 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,6 +60,7 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
 				keyGroupId,
 				keyGroupPrefixBytes,
 				rocksDBResource.getRocksDB(),
+				rocksDBResource.getReadOptions(),
 				rocksDBResource.getDefaultColumnFamily(),
 				TestElementSerializer.INSTANCE,
 				outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index d25baa7..3b3b697 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
 					LOG.error("Close previous ColumnOptions's instance failed.", e);
 				}
 
-				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
+				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
 			}
 		});
 	}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
 			PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
 		this.writeOptions = new WriteOptions();
 		this.writeOptions.disableWAL();
-		this.readOptions = new ReadOptions();
+		this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		this.columnFamilyHandles = new ArrayList<>(1);
 		this.rocksDB = RocksDB.open(
 			dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 8f71b65..1a4808f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
 			ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 
 			try (
-				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
+				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
 				RocksStateKeysIterator<K> iteratorWrapper =
 					new RocksStateKeysIterator<>(
 						iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d3eb6db..197f7ca 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -123,7 +124,7 @@ public class RocksDBStateBackendConfigTest {
 		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -160,7 +161,7 @@ public class RocksDBStateBackendConfigTest {
 
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 		Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
 		keyedBackend.dispose();
 
@@ -168,7 +169,7 @@ public class RocksDBStateBackendConfigTest {
 		conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
 		rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
-		keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 		Assert.assertEquals(
 			HeapPriorityQueueSetFactory.class,
 			keyedBackend.getPriorityQueueFactory().getClass());
@@ -197,7 +198,7 @@ public class RocksDBStateBackendConfigTest {
 		final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
 			configFromConfFile,
 			Thread.currentThread().getContextClassLoader());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
 
 		// priorityQueueStateType of the job backend should be preserved
 		assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -254,7 +255,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath(configuredPath);
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -725,23 +726,6 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
-			RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
-
-		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
-			env,
-			env.getJobID(),
-			"test_op",
-			IntSerializer.INSTANCE,
-			1,
-			new KeyGroupRange(0, 0),
-			env.getTaskKvStateRegistry(),
-			TtlTimeProvider.DEFAULT,
-			new UnregisteredMetricsGroup(),
-			Collections.emptyList(),
-			new CloseableRegistry());
-	}
-
 	static MockEnvironment getMockEnvironment(File... tempDirs) {
 		final String[] tempDirStrings = new String[tempDirs.length];
 		for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
new file mode 100644
index 0000000..59a4822
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
+ *
+ * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
+ * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
+ * could not get expected result, e.g. FLINK-17800
+ */
+public class RocksDBStateMisuseOptionTest {
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	/**
+	 * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
+	 *
+	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+	 */
+	@Test
+	public void testMisuseOptimizePointLookupWithMapState() throws Exception {
+		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+		MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+		keyedStateBackend.setCurrentKey(1);
+		Map<Integer, Long> expectedResult = new HashMap<>();
+		for (int i = 0; i < 100; i++) {
+			long uv = ThreadLocalRandom.current().nextLong();
+			mapState.put(i, uv);
+			expectedResult.put(i, uv);
+		}
+
+		Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
+		while (iterator.hasNext()) {
+			Map.Entry<Integer, Long> entry = iterator.next();
+			assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
+			iterator.remove();
+		}
+		assertTrue(expectedResult.isEmpty());
+		assertTrue(mapState.isEmpty());
+	}
+
+	/**
+	 * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
+	 *
+	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+	 */
+	@Test
+	public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
+		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
+			keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
+
+		PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
+		// ensure we insert timers more than cache capacity.
+		int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+		List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
+		Collections.shuffle(timeStamps);
+		for (Integer timeStamp : timeStamps) {
+			TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+			priorityQueue.add(timer);
+			expectedPriorityQueue.add(timer);
+		}
+		assertEquals(queueSize, priorityQueue.size());
+		TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+		while ((timer = priorityQueue.poll()) != null) {
+			assertEquals(expectedPriorityQueue.poll(), timer);
+		}
+		assertTrue(expectedPriorityQueue.isEmpty());
+		assertTrue(priorityQueue.isEmpty());
+
+	}
+
+	private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
+		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
+		rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+		rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+				return currentOptions;
+			}
+
+			@Override
+			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+				return currentOptions.optimizeForPointLookup(64);
+			}
+		});
+		return rocksDBStateBackend;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index ed44a73..cee56aa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -35,6 +36,7 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -98,4 +100,23 @@ public final class RocksDBTestUtils {
 				defaultCFHandle,
 				new CloseableRegistry());
 	}
+
+	public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend,
+			Environment env,
+			TypeSerializer<K> keySerializer) throws IOException {
+
+		return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
+			env,
+			env.getJobID(),
+			"test_op",
+			keySerializer,
+			1,
+			new KeyGroupRange(0, 0),
+			env.getTaskKvStateRegistry(),
+			TtlTimeProvider.DEFAULT,
+			new UnregisteredMetricsGroup(),
+			Collections.emptyList(),
+			new CloseableRegistry());
+	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 4447f08..2c73fc7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -74,7 +75,8 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 	public void testMergeIterator(int maxParallelism) throws Exception {
 		Random random = new Random(1234);
 
-		try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+		try (ReadOptions readOptions = new ReadOptions();
+			RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
 			List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -108,7 +110,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 			int id = 0;
 			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
 				++id;
 			}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 96003d6..ce06d1f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -167,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
 			int pos = 0;
 
-			try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
+			try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
 				// seek to start
 				unsafe.putInt(keyTemplate, offset, 0);
 				iterator.seek(keyTemplate);