You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/06/17 12:34:04 UTC

[flink] 02/02: Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6f51d8767990cee7f1ca052b040530b646c3efe
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Wed Jun 17 10:13:30 2020 +0200

    Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"
    
    This reverts commit 8ca388ca0225ff22f532c8a65f97d8cfea027c22.
---
 .../state/RocksDBCachingPriorityQueueSet.java      |   8 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   |   4 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  23 +---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  17 +--
 .../contrib/streaming/state/RocksDBMapState.java   |   6 +-
 .../streaming/state/RocksDBOperationUtils.java     |  15 +--
 .../state/RocksDBPriorityQueueSetFactory.java      |   9 +-
 .../RocksDBIncrementalRestoreOperation.java        |   7 +-
 ...rtitionedPriorityQueueWithRocksDBStoreTest.java |   1 -
 .../contrib/streaming/state/RocksDBResource.java   |   4 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |   2 +-
 .../state/RocksDBStateBackendConfigTest.java       |  28 +++-
 .../state/RocksDBStateMisuseOptionTest.java        | 147 ---------------------
 .../contrib/streaming/state/RocksDBTestUtils.java  |  21 ---
 ...RocksKeyGroupsRocksSingleStateIteratorTest.java |   6 +-
 .../state/benchmark/RocksDBPerformanceTest.java    |   3 +-
 16 files changed, 46 insertions(+), 255 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index fb9a833..364185a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -64,9 +63,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 	@Nonnull
 	private final RocksDB db;
 
-	@Nonnull
-	private final ReadOptions readOptions;
-
 	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
 	@Nonnull
 	private final ColumnFamilyHandle columnFamilyHandle;
@@ -116,7 +112,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnegative int keyGroupId,
 		@Nonnegative int keyGroupPrefixBytes,
 		@Nonnull RocksDB db,
-		@Nonnull ReadOptions readOptions,
 		@Nonnull ColumnFamilyHandle columnFamilyHandle,
 		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
 		@Nonnull DataOutputSerializer outputStream,
@@ -124,7 +119,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
 		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
 		this.db = db;
-		this.readOptions = readOptions;
 		this.columnFamilyHandle = columnFamilyHandle;
 		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
 		this.batchWrapper = batchWrapper;
@@ -310,7 +304,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
 		flushWriteBatch();
 		return new RocksBytesIterator(
 			new RocksIteratorWrapper(
-				db.newIterator(columnFamilyHandle, readOptions)));
+				db.newIterator(columnFamilyHandle)));
 	}
 
 	/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5bce695..1f43dd0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,7 +21,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -117,8 +116,7 @@ public class RocksDBIncrementalCheckpointUtils {
 		@Nonnegative long writeBatchSize) throws RocksDBException {
 
 		for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-			try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
-				RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
+			try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
 				RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
 
 				iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f0cce0b..61d8688 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,7 +66,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.Snapshot;
@@ -151,12 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final WriteOptions writeOptions;
 
 	/**
-	 * The read options to use when creating iterators.
-	 * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
-	 */
-	private final ReadOptions readOptions;
-
-	/**
 	 * The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
 	 */
 	private final long writeBatchSize;
@@ -219,8 +212,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider,
 		RocksDB db,
-		WriteOptions writeOptions,
-		ReadOptions readOptions,
 		LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
 		int keyGroupPrefixBytes,
 		CloseableRegistry cancelStreamRegistry,
@@ -259,8 +250,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
 		this.kvStateInformation = kvStateInformation;
 
-		this.writeOptions = writeOptions;
-		this.readOptions = readOptions;
+		this.writeOptions = new WriteOptions().setDisableWAL(true);
 		checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
 		this.writeBatchSize = writeBatchSize;
 		this.db = db;
@@ -300,7 +290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
 		}
 
-		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
+		RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
 		iterator.seekToFirst();
 
 		final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -370,7 +360,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -418,10 +407,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return writeOptions;
 	}
 
-	public ReadOptions getReadOptions() {
-		return readOptions;
-	}
-
 	RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
 		return sharedRocksKeyBuilder;
 	}
@@ -621,7 +606,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		Snapshot rocksDBSnapshot = db.getSnapshot();
 		try (
-			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
+			RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
 			RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
 		) {
 			iterator.seekToFirst();
@@ -696,7 +681,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
 			//TODO maybe filterOrTransform only for k/v states
-			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
+			try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
 				rocksIterator.seekToFirst();
 
 				while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index b9bdbb8..d1a0184 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,6 @@ import org.apache.flink.util.ResourceGuard;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -245,7 +244,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
 		// The write options to use in the states.
 		WriteOptions writeOptions = null;
-		ReadOptions readOptions = null;
 		LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
 		RocksDB db = null;
 		AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -284,7 +282,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			}
 
 			writeOptions = new WriteOptions().setDisableWAL(true);
-			readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 			writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
 			// it is important that we only create the key builder after the restore, and not before;
 			// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -297,13 +294,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
 				kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
 			// init priority queue factory
-			priorityQueueFactory = initPriorityQueueFactory(
-				keyGroupPrefixBytes,
-				kvStateInformation,
-				db,
-				readOptions,
-				writeBatchWrapper,
-				nativeMetricMonitor);
+			priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
+				writeBatchWrapper, nativeMetricMonitor);
 		} catch (Throwable e) {
 			// Do clean up
 			List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -321,7 +313,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			IOUtils.closeQuietly(restoreOperation);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
 			IOUtils.closeQuietly(optionsContainer);
-			IOUtils.closeQuietly(readOptions);
 			IOUtils.closeQuietly(writeOptions);
 			ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
 			kvStateInformation.clear();
@@ -353,8 +344,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 			this.executionConfig,
 			this.ttlTimeProvider,
 			db,
-			writeOptions,
-			readOptions,
 			kvStateInformation,
 			keyGroupPrefixBytes,
 			cancelStreamRegistryForBackend,
@@ -483,7 +472,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 		int keyGroupPrefixBytes,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor) {
 		PriorityQueueSetFactory priorityQueueFactory;
@@ -498,7 +486,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
 					numberOfKeyGroups,
 					kvStateInformation,
 					db,
-					readOptions,
 					writeBatchWrapper,
 					nativeMetricMonitor,
 					columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 6c7f541..f0b47cd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
 	public boolean isEmpty() {
 		final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
 
-		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
+		try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
 
 			iterator.seek(prefixBytes);
 
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@Override
 	public void clear() {
 		try {
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
 				RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
 
 				final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
 
 			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
 			// occurred in the below code block.
-			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
+			try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
 
 				/*
 				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 0f564d5..1455c1b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,7 +32,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -95,18 +94,12 @@ public class RocksDBOperationUtils {
 		return dbRef;
 	}
 
-	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
-		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+		return new RocksIteratorWrapper(db.newIterator());
 	}
 
-	/**
-	 * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
-	 *
-	 * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
-	 */
-	// TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
-	public static ReadOptions createTotalOrderSeekReadOptions() {
-		return new ReadOptions().setTotalOrderSeek(true);
+	public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
+		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
 	}
 
 	public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 59daff8..1cbeebe 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,7 +36,6 @@ import org.apache.flink.util.StateMigrationException;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import javax.annotation.Nonnull;
@@ -54,8 +52,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	/**
 	 * Default cache size per key-group.
 	 */
-	@VisibleForTesting
-	static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+	private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
 
 	/**
 	 * A shared buffer to serialize elements for the priority queue.
@@ -74,7 +71,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 	private final int numberOfKeyGroups;
 	private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
 	private final RocksDB db;
-	private final ReadOptions readOptions;
 	private final RocksDBWriteBatchWrapper writeBatchWrapper;
 	private final RocksDBNativeMetricMonitor nativeMetricMonitor;
 	private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -85,7 +81,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		int numberOfKeyGroups,
 		Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
 		RocksDB db,
-		ReadOptions readOptions,
 		RocksDBWriteBatchWrapper writeBatchWrapper,
 		RocksDBNativeMetricMonitor nativeMetricMonitor,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -94,7 +89,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.kvStateInformation = kvStateInformation;
 		this.db = db;
-		this.readOptions = readOptions;
 		this.writeBatchWrapper = writeBatchWrapper;
 		this.nativeMetricMonitor = nativeMetricMonitor;
 		this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -128,7 +122,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 						keyGroupId,
 						keyGroupPrefixBytes,
 						db,
-						readOptions,
 						columnFamilyHandle,
 						byteOrderedElementSerializer,
 						sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 53c7537..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,7 +51,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -310,7 +309,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 						null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
 						.columnFamilyHandle;
 
-					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
+					try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
 
 						iterator.seek(startKeyGroupPrefixBytes);
 
@@ -377,8 +376,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 		@Nonnull
 		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
-		private final ReadOptions readOptions;
-
 		private RestoredDBInstance(
 			@Nonnull RocksDB db,
 			@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -389,7 +386,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			this.columnFamilyHandles = columnFamilyHandles;
 			this.columnFamilyDescriptors = columnFamilyDescriptors;
 			this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-			this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
 		}
 
 		@Override
@@ -401,7 +397,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
 			IOUtils.closeAllQuietly(columnFamilyHandles);
 			IOUtils.closeQuietly(db);
 			IOUtils.closeAllQuietly(columnFamilyOptions);
-			IOUtils.closeQuietly(readOptions);
 		}
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index fd68d70..d402c3d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,7 +60,6 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
 				keyGroupId,
 				keyGroupPrefixBytes,
 				rocksDBResource.getRocksDB(),
-				rocksDBResource.getReadOptions(),
 				rocksDBResource.getDefaultColumnFamily(),
 				TestElementSerializer.INSTANCE,
 				outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 3b3b697..d25baa7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
 					LOG.error("Close previous ColumnOptions's instance failed.", e);
 				}
 
-				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
+				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
 			}
 		});
 	}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
 			PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
 		this.writeOptions = new WriteOptions();
 		this.writeOptions.disableWAL();
-		this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+		this.readOptions = new ReadOptions();
 		this.columnFamilyHandles = new ArrayList<>(1);
 		this.rocksDB = RocksDB.open(
 			dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 1a4808f..8f71b65 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
 			ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
 
 			try (
-				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
+				RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
 				RocksStateKeysIterator<K> iteratorWrapper =
 					new RocksStateKeysIterator<>(
 						iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 197f7ca..d3eb6db 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,7 +61,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -124,7 +123,7 @@ public class RocksDBStateBackendConfigTest {
 		assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -161,7 +160,7 @@ public class RocksDBStateBackendConfigTest {
 
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
 
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 		Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
 		keyedBackend.dispose();
 
@@ -169,7 +168,7 @@ public class RocksDBStateBackendConfigTest {
 		conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
 		rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
-		keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 		Assert.assertEquals(
 			HeapPriorityQueueSetFactory.class,
 			keyedBackend.getPriorityQueueFactory().getClass());
@@ -198,7 +197,7 @@ public class RocksDBStateBackendConfigTest {
 		final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
 			configFromConfFile,
 			Thread.currentThread().getContextClassLoader());
-		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
+		final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
 
 		// priorityQueueStateType of the job backend should be preserved
 		assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -255,7 +254,7 @@ public class RocksDBStateBackendConfigTest {
 		rocksDbBackend.setDbStoragePath(configuredPath);
 
 		final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
-		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
 
 		try {
 			File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -726,6 +725,23 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+			RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
+
+		return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
+			env,
+			env.getJobID(),
+			"test_op",
+			IntSerializer.INSTANCE,
+			1,
+			new KeyGroupRange(0, 0),
+			env.getTaskKvStateRegistry(),
+			TtlTimeProvider.DEFAULT,
+			new UnregisteredMetricsGroup(),
+			Collections.emptyList(),
+			new CloseableRegistry());
+	}
+
 	static MockEnvironment getMockEnvironment(File... tempDirs) {
 		final String[] tempDirStrings = new String[tempDirs.length];
 		for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
deleted file mode 100644
index 59a4822..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
- *
- * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
- * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
- * could not get expected result, e.g. FLINK-17800
- */
-public class RocksDBStateMisuseOptionTest {
-
-	@Rule
-	public final TemporaryFolder tempFolder = new TemporaryFolder();
-
-	/**
-	 * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
-	 *
-	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
-	 */
-	@Test
-	public void testMisuseOptimizePointLookupWithMapState() throws Exception {
-		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
-		MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
-		keyedStateBackend.setCurrentKey(1);
-		Map<Integer, Long> expectedResult = new HashMap<>();
-		for (int i = 0; i < 100; i++) {
-			long uv = ThreadLocalRandom.current().nextLong();
-			mapState.put(i, uv);
-			expectedResult.put(i, uv);
-		}
-
-		Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
-		while (iterator.hasNext()) {
-			Map.Entry<Integer, Long> entry = iterator.next();
-			assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
-			iterator.remove();
-		}
-		assertTrue(expectedResult.isEmpty());
-		assertTrue(mapState.isEmpty());
-	}
-
-	/**
-	 * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
-	 *
-	 * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
-	 */
-	@Test
-	public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
-		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
-			keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
-
-		PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
-		// ensure we insert timers more than cache capacity.
-		int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
-		List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
-		Collections.shuffle(timeStamps);
-		for (Integer timeStamp : timeStamps) {
-			TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
-			priorityQueue.add(timer);
-			expectedPriorityQueue.add(timer);
-		}
-		assertEquals(queueSize, priorityQueue.size());
-		TimerHeapInternalTimer<Integer, VoidNamespace> timer;
-		while ((timer = priorityQueue.poll()) != null) {
-			assertEquals(expectedPriorityQueue.poll(), timer);
-		}
-		assertTrue(expectedPriorityQueue.isEmpty());
-		assertTrue(priorityQueue.isEmpty());
-
-	}
-
-	private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
-		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
-		rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
-		rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-				return currentOptions;
-			}
-
-			@Override
-			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
-				return currentOptions.optimizeForPointLookup(64);
-			}
-		});
-		return rocksDBStateBackend;
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index cee56aa..ed44a73 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -36,7 +35,6 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Collections;
 
 /**
@@ -100,23 +98,4 @@ public final class RocksDBTestUtils {
 				defaultCFHandle,
 				new CloseableRegistry());
 	}
-
-	public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
-			RocksDBStateBackend rocksDbBackend,
-			Environment env,
-			TypeSerializer<K> keySerializer) throws IOException {
-
-		return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
-			env,
-			env.getJobID(),
-			"test_op",
-			keySerializer,
-			1,
-			new KeyGroupRange(0, 0),
-			env.getTaskKvStateRegistry(),
-			TtlTimeProvider.DEFAULT,
-			new UnregisteredMetricsGroup(),
-			Collections.emptyList(),
-			new CloseableRegistry());
-	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 2c73fc7..4447f08 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 
 import java.io.DataOutputStream;
@@ -75,8 +74,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 	public void testMergeIterator(int maxParallelism) throws Exception {
 		Random random = new Random(1234);
 
-		try (ReadOptions readOptions = new ReadOptions();
-			RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+		try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
 			List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
 			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
@@ -110,7 +108,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
 
 			int id = 0;
 			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
 				++id;
 			}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index ce06d1f..96003d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
 import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -166,7 +167,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
 			int pos = 0;
 
-			try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
+			try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
 				// seek to start
 				unsafe.putInt(keyTemplate, offset, 0);
 				iterator.seek(keyTemplate);