You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/16 08:31:24 UTC
[flink] 01/02: [FLINK-17800][roksdb] Ensure total order seek to
avoid user misuse
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5c0de8d6d5a9eea1a779cf3703412f522bece54c
Author: Yun Tang <my...@live.com>
AuthorDate: Fri Jun 5 01:16:27 2020 +0800
[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse
---
.../state/RocksDBCachingPriorityQueueSet.java | 8 +-
.../state/RocksDBIncrementalCheckpointUtils.java | 4 +-
.../streaming/state/RocksDBKeyedStateBackend.java | 23 +++-
.../state/RocksDBKeyedStateBackendBuilder.java | 17 ++-
.../contrib/streaming/state/RocksDBMapState.java | 6 +-
.../streaming/state/RocksDBOperationUtils.java | 15 ++-
.../state/RocksDBPriorityQueueSetFactory.java | 9 +-
.../RocksDBIncrementalRestoreOperation.java | 7 +-
...rtitionedPriorityQueueWithRocksDBStoreTest.java | 1 +
.../contrib/streaming/state/RocksDBResource.java | 4 +-
.../state/RocksDBRocksStateKeysIteratorTest.java | 2 +-
.../state/RocksDBStateBackendConfigTest.java | 28 +---
.../state/RocksDBStateMisuseOptionTest.java | 147 +++++++++++++++++++++
.../contrib/streaming/state/RocksDBTestUtils.java | 21 +++
...RocksKeyGroupsRocksSingleStateIteratorTest.java | 6 +-
.../state/benchmark/RocksDBPerformanceTest.java | 3 +-
16 files changed, 255 insertions(+), 46 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index 364185a..fb9a833 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull
private final RocksDB db;
+ @Nonnull
+ private final ReadOptions readOptions;
+
/** Handle to the column family of the RocksDB instance in which the elements are stored. */
@Nonnull
private final ColumnFamilyHandle columnFamilyHandle;
@@ -112,6 +116,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnegative int keyGroupId,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
+ @Nonnull ReadOptions readOptions,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull DataOutputSerializer outputStream,
@@ -119,6 +124,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
+ this.readOptions = readOptions;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.batchWrapper = batchWrapper;
@@ -304,7 +310,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
flushWriteBatch();
return new RocksBytesIterator(
new RocksIteratorWrapper(
- db.newIterator(columnFamilyHandle)));
+ db.newIterator(columnFamilyHandle, readOptions)));
}
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 1f43dd0..5bce695 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,6 +21,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -116,7 +117,8 @@ public class RocksDBIncrementalCheckpointUtils {
@Nonnegative long writeBatchSize) throws RocksDBException {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
- try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
+ try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+ RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 61d8688..f0cce0b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
@@ -150,6 +151,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final WriteOptions writeOptions;
/**
+ * The read options to use when creating iterators.
+ * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
+ */
+ private final ReadOptions readOptions;
+
+ /**
* The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
*/
private final long writeBatchSize;
@@ -212,6 +219,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
+ WriteOptions writeOptions,
+ ReadOptions readOptions,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
@@ -250,7 +259,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;
- this.writeOptions = new WriteOptions().setDisableWAL(true);
+ this.writeOptions = writeOptions;
+ this.readOptions = readOptions;
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
this.writeBatchSize = writeBatchSize;
this.db = db;
@@ -290,7 +300,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -360,6 +370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyOptions.forEach(IOUtils::closeQuietly);
IOUtils.closeQuietly(optionsContainer);
+ IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -407,6 +418,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return writeOptions;
}
+ public ReadOptions getReadOptions() {
+ return readOptions;
+ }
+
RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}
@@ -606,7 +621,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Snapshot rocksDBSnapshot = db.getSnapshot();
try (
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
) {
iterator.seekToFirst();
@@ -681,7 +696,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
//TODO maybe filterOrTransform only for k/v states
- try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
+ try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
rocksIterator.seekToFirst();
while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d1a0184..b9bdbb8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,6 +54,7 @@ import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
// The write options to use in the states.
WriteOptions writeOptions = null;
+ ReadOptions readOptions = null;
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -282,6 +284,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
}
writeOptions = new WriteOptions().setDisableWAL(true);
+ readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -294,8 +297,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
// init priority queue factory
- priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
- writeBatchWrapper, nativeMetricMonitor);
+ priorityQueueFactory = initPriorityQueueFactory(
+ keyGroupPrefixBytes,
+ kvStateInformation,
+ db,
+ readOptions,
+ writeBatchWrapper,
+ nativeMetricMonitor);
} catch (Throwable e) {
// Do clean up
List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -313,6 +321,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
+ IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
@@ -344,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
this.executionConfig,
this.ttlTimeProvider,
db,
+ writeOptions,
+ readOptions,
kvStateInformation,
keyGroupPrefixBytes,
cancelStreamRegistryForBackend,
@@ -472,6 +483,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
int keyGroupPrefixBytes,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
+ ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
@@ -486,6 +498,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
numberOfKeyGroups,
kvStateInformation,
db,
+ readOptions,
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index f0b47cd..6c7f541 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -236,7 +236,7 @@ class RocksDBMapState<K, N, UK, UV>
public boolean isEmpty() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
iterator.seek(prefixBytes);
@@ -247,7 +247,7 @@ class RocksDBMapState<K, N, UK, UV>
@Override
public void clear() {
try {
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -570,7 +570,7 @@ class RocksDBMapState<K, N, UK, UV>
// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
/*
* The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 1455c1b..0f564d5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,6 +32,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -94,12 +95,18 @@ public class RocksDBOperationUtils {
return dbRef;
}
- public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
- return new RocksIteratorWrapper(db.newIterator());
+ public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
+ return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
}
- public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
- return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+ /**
+ * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
+ *
+ * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
+ */
+ // TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
+ public static ReadOptions createTotalOrderSeekReadOptions() {
+ return new ReadOptions().setTotalOrderSeek(true);
}
public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 1cbeebe..59daff8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -36,6 +37,7 @@ import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import javax.annotation.Nonnull;
@@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/**
* Default cache size per key-group.
*/
- private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+ @VisibleForTesting
+ static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
/**
* A shared buffer to serialize elements for the priority queue.
@@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
private final int numberOfKeyGroups;
private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
private final RocksDB db;
+ private final ReadOptions readOptions;
private final RocksDBWriteBatchWrapper writeBatchWrapper;
private final RocksDBNativeMetricMonitor nativeMetricMonitor;
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
int numberOfKeyGroups,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
+ ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
this.numberOfKeyGroups = numberOfKeyGroups;
this.kvStateInformation = kvStateInformation;
this.db = db;
+ this.readOptions = readOptions;
this.writeBatchWrapper = writeBatchWrapper;
this.nativeMetricMonitor = nativeMetricMonitor;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -122,6 +128,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
keyGroupId,
keyGroupPrefixBytes,
db,
+ readOptions,
columnFamilyHandle,
byteOrderedElementSerializer,
sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index f7abed6..53c7537 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,6 +51,7 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -309,7 +310,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
iterator.seek(startKeyGroupPrefixBytes);
@@ -376,6 +377,8 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
@Nonnull
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+ private final ReadOptions readOptions;
+
private RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -386,6 +389,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
this.columnFamilyHandles = columnFamilyHandles;
this.columnFamilyDescriptors = columnFamilyDescriptors;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
+ this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
}
@Override
@@ -397,6 +401,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
IOUtils.closeAllQuietly(columnFamilyHandles);
IOUtils.closeQuietly(db);
IOUtils.closeAllQuietly(columnFamilyOptions);
+ IOUtils.closeQuietly(readOptions);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index d402c3d..fd68d70 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,6 +60,7 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
keyGroupId,
keyGroupPrefixBytes,
rocksDBResource.getRocksDB(),
+ rocksDBResource.getReadOptions(),
rocksDBResource.getDefaultColumnFamily(),
TestElementSerializer.INSTANCE,
outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index d25baa7..3b3b697 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
LOG.error("Close previous ColumnOptions's instance failed.", e);
}
- return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
+ return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
}
});
}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
this.writeOptions = new WriteOptions();
this.writeOptions.disableWAL();
- this.readOptions = new ReadOptions();
+ this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
this.columnFamilyHandles = new ArrayList<>(1);
this.rocksDB = RocksDB.open(
dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 8f71b65..1a4808f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
try (
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
RocksStateKeysIterator<K> iteratorWrapper =
new RocksStateKeysIterator<>(
iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d3eb6db..197f7ca 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -61,6 +61,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -123,7 +124,7 @@ public class RocksDBStateBackendConfigTest {
assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
- final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -160,7 +161,7 @@ public class RocksDBStateBackendConfigTest {
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
- RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+ RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();
@@ -168,7 +169,7 @@ public class RocksDBStateBackendConfigTest {
conf.set(RocksDBOptions.TIMER_SERVICE_FACTORY, RocksDBStateBackend.PriorityQueueStateType.HEAP);
rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
- keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+ keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
Assert.assertEquals(
HeapPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
@@ -197,7 +198,7 @@ public class RocksDBStateBackendConfigTest {
final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
configFromConfFile,
Thread.currentThread().getContextClassLoader());
- final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
// priorityQueueStateType of the job backend should be preserved
assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -254,7 +255,7 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend.setDbStoragePath(configuredPath);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
- RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+ RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -725,23 +726,6 @@ public class RocksDBStateBackendConfigTest {
// Utilities
// ------------------------------------------------------------------------
- static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
- RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
-
- return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
- env,
- env.getJobID(),
- "test_op",
- IntSerializer.INSTANCE,
- 1,
- new KeyGroupRange(0, 0),
- env.getTaskKvStateRegistry(),
- TtlTimeProvider.DEFAULT,
- new UnregisteredMetricsGroup(),
- Collections.emptyList(),
- new CloseableRegistry());
- }
-
static MockEnvironment getMockEnvironment(File... tempDirs) {
final String[] tempDirStrings = new String[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
new file mode 100644
index 0000000..59a4822
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
+ *
+ * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
+ * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
+ * could not get expected result, e.g. FLINK-17800
+ */
+public class RocksDBStateMisuseOptionTest {
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ /**
+ * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
+ *
+ * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+ */
+ @Test
+ public void testMisuseOptimizePointLookupWithMapState() throws Exception {
+ RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+ RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+ MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+ MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+ keyedStateBackend.setCurrentKey(1);
+ Map<Integer, Long> expectedResult = new HashMap<>();
+ for (int i = 0; i < 100; i++) {
+ long uv = ThreadLocalRandom.current().nextLong();
+ mapState.put(i, uv);
+ expectedResult.put(i, uv);
+ }
+
+ Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Long> entry = iterator.next();
+ assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
+ iterator.remove();
+ }
+ assertTrue(expectedResult.isEmpty());
+ assertTrue(mapState.isEmpty());
+ }
+
+ /**
+ * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
+ *
+ * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
+ */
+ @Test
+ public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
+ RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
+ RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+ KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
+ keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
+
+ PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
+ // ensure we insert timers more than cache capacity.
+ int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+ List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
+ Collections.shuffle(timeStamps);
+ for (Integer timeStamp : timeStamps) {
+ TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+ priorityQueue.add(timer);
+ expectedPriorityQueue.add(timer);
+ }
+ assertEquals(queueSize, priorityQueue.size());
+ TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+ while ((timer = priorityQueue.poll()) != null) {
+ assertEquals(expectedPriorityQueue.poll(), timer);
+ }
+ assertTrue(expectedPriorityQueue.isEmpty());
+ assertTrue(priorityQueue.isEmpty());
+
+ }
+
+ private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
+ RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
+ rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+ rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ return currentOptions;
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ return currentOptions.optimizeForPointLookup(64);
+ }
+ });
+ return rocksDBStateBackend;
+ }
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index ed44a73..cee56aa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -35,6 +36,7 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDB;
import java.io.File;
+import java.io.IOException;
import java.util.Collections;
/**
@@ -98,4 +100,23 @@ public final class RocksDBTestUtils {
defaultCFHandle,
new CloseableRegistry());
}
+
+ public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
+ RocksDBStateBackend rocksDbBackend,
+ Environment env,
+ TypeSerializer<K> keySerializer) throws IOException {
+
+ return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "test_op",
+ keySerializer,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT,
+ new UnregisteredMetricsGroup(),
+ Collections.emptyList(),
+ new CloseableRegistry());
+ }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 4447f08..2c73fc7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import java.io.DataOutputStream;
@@ -74,7 +75,8 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
public void testMergeIterator(int maxParallelism) throws Exception {
Random random = new Random(1234);
- try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+ try (ReadOptions readOptions = new ReadOptions();
+ RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
@@ -108,7 +110,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
int id = 0;
for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
- rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
+ rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
++id;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index 96003d6..ce06d1f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state.benchmark;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -167,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
int pos = 0;
- try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
+ try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);