You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/21 04:56:20 UTC
[flink] 02/03: Revert "[FLINK-17800][roksdb] Ensure total order
seek to avoid user misuse"
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 831daaae2c0cdef6871001e856dfd6a54da2a943
Author: Yu Li <li...@apache.org>
AuthorDate: Sat Jun 20 16:59:06 2020 +0800
Revert "[FLINK-17800][roksdb] Ensure total order seek to avoid user misuse"
This reverts commit b8ddbef9a5cc5dc769ba61bd5019dd96843c932f.
---
.../state/RocksDBCachingPriorityQueueSet.java | 8 +-
.../state/RocksDBIncrementalCheckpointUtils.java | 4 +-
.../streaming/state/RocksDBKeyedStateBackend.java | 23 +---
.../state/RocksDBKeyedStateBackendBuilder.java | 17 +--
.../contrib/streaming/state/RocksDBMapState.java | 6 +-
.../streaming/state/RocksDBOperationUtils.java | 15 +--
.../state/RocksDBPriorityQueueSetFactory.java | 9 +-
.../RocksDBIncrementalRestoreOperation.java | 7 +-
...rtitionedPriorityQueueWithRocksDBStoreTest.java | 1 -
.../contrib/streaming/state/RocksDBResource.java | 4 +-
.../state/RocksDBRocksStateKeysIteratorTest.java | 2 +-
.../state/RocksDBStateBackendConfigTest.java | 28 +++-
.../state/RocksDBStateMisuseOptionTest.java | 147 ---------------------
.../contrib/streaming/state/RocksDBTestUtils.java | 21 ---
...RocksKeyGroupsRocksSingleStateIteratorTest.java | 6 +-
.../state/benchmark/RocksDBPerformanceTest.java | 3 +-
.../api/operators/TimerHeapInternalTimer.java | 2 +-
17 files changed, 47 insertions(+), 256 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
index fb9a833..364185a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -64,9 +63,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull
private final RocksDB db;
- @Nonnull
- private final ReadOptions readOptions;
-
/** Handle to the column family of the RocksDB instance in which the elements are stored. */
@Nonnull
private final ColumnFamilyHandle columnFamilyHandle;
@@ -116,7 +112,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnegative int keyGroupId,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
- @Nonnull ReadOptions readOptions,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull DataOutputSerializer outputStream,
@@ -124,7 +119,6 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
- this.readOptions = readOptions;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.batchWrapper = batchWrapper;
@@ -310,7 +304,7 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
flushWriteBatch();
return new RocksBytesIterator(
new RocksIteratorWrapper(
- db.newIterator(columnFamilyHandle, readOptions)));
+ db.newIterator(columnFamilyHandle)));
}
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 5bce695..1f43dd0 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,7 +21,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -117,8 +116,7 @@ public class RocksDBIncrementalCheckpointUtils {
@Nonnegative long writeBatchSize) throws RocksDBException {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
- try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
- RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
+ try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
iteratorWrapper.seek(beginKeyBytes);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 505bd2a..2ddb79b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -66,7 +66,6 @@ import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
@@ -151,12 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final WriteOptions writeOptions;
/**
- * The read options to use when creating iterators.
- * We ensure total order seek in case user misuse, see FLINK-17800 for more details.
- */
- private final ReadOptions readOptions;
-
- /**
* The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
*/
private final long writeBatchSize;
@@ -219,8 +212,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
- WriteOptions writeOptions,
- ReadOptions readOptions,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
@@ -259,8 +250,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;
- this.writeOptions = writeOptions;
- this.readOptions = readOptions;
+ this.writeOptions = new WriteOptions().setDisableWAL(true);
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
this.writeBatchSize = writeBatchSize;
this.db = db;
@@ -300,7 +290,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
iterator.seekToFirst();
final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
@@ -370,7 +360,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyOptions.forEach(IOUtils::closeQuietly);
IOUtils.closeQuietly(optionsContainer);
- IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
@@ -418,10 +407,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return writeOptions;
}
- public ReadOptions getReadOptions() {
- return readOptions;
- }
-
RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}
@@ -614,7 +599,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Snapshot rocksDBSnapshot = db.getSnapshot();
try (
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
) {
iterator.seekToFirst();
@@ -689,7 +674,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
//TODO maybe filterOrTransform only for k/v states
- try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
+ try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
rocksIterator.seekToFirst();
while (rocksIterator.isValid()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 08b4864..70c438e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -54,7 +54,6 @@ import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
@@ -251,7 +250,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
// The write options to use in the states.
WriteOptions writeOptions = null;
- ReadOptions readOptions = null;
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
@@ -290,7 +288,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
}
writeOptions = new WriteOptions().setDisableWAL(true);
- readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
@@ -303,13 +300,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
// init priority queue factory
- priorityQueueFactory = initPriorityQueueFactory(
- keyGroupPrefixBytes,
- kvStateInformation,
- db,
- readOptions,
- writeBatchWrapper,
- nativeMetricMonitor);
+ priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
+ writeBatchWrapper, nativeMetricMonitor);
} catch (Throwable e) {
// Do clean up
List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
@@ -327,7 +319,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
- IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
@@ -359,8 +350,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
this.executionConfig,
this.ttlTimeProvider,
db,
- writeOptions,
- readOptions,
kvStateInformation,
keyGroupPrefixBytes,
cancelStreamRegistryForBackend,
@@ -489,7 +478,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
int keyGroupPrefixBytes,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
- ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
@@ -504,7 +492,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken
numberOfKeyGroups,
kvStateInformation,
db,
- readOptions,
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 7ba3edd..ad9281c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -243,7 +243,7 @@ class RocksDBMapState<K, N, UK, UV>
public boolean isEmpty() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
iterator.seek(prefixBytes);
@@ -254,7 +254,7 @@ class RocksDBMapState<K, N, UK, UV>
@Override
public void clear() {
try {
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {
final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
@@ -577,7 +577,7 @@ class RocksDBMapState<K, N, UK, UV>
// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
/*
* The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index 0f564d5..1455c1b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -32,7 +32,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -95,18 +94,12 @@ public class RocksDBOperationUtils {
return dbRef;
}
- public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
- return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+ public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+ return new RocksIteratorWrapper(db.newIterator());
}
- /**
- * Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
- *
- * <p>Note, remember to close the generated {@link ReadOptions} when dispose.
- */
- // TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
- public static ReadOptions createTotalOrderSeekReadOptions() {
- return new ReadOptions().setTotalOrderSeek(true);
+ public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
+ return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
}
public static void registerKvStateInformation(
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 59daff8..1cbeebe 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -17,7 +17,6 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -37,7 +36,6 @@ import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import javax.annotation.Nonnull;
@@ -54,8 +52,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/**
* Default cache size per key-group.
*/
- @VisibleForTesting
- static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
+ private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
/**
* A shared buffer to serialize elements for the priority queue.
@@ -74,7 +71,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
private final int numberOfKeyGroups;
private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
private final RocksDB db;
- private final ReadOptions readOptions;
private final RocksDBWriteBatchWrapper writeBatchWrapper;
private final RocksDBNativeMetricMonitor nativeMetricMonitor;
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
@@ -85,7 +81,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
int numberOfKeyGroups,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
- ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
@@ -94,7 +89,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
this.numberOfKeyGroups = numberOfKeyGroups;
this.kvStateInformation = kvStateInformation;
this.db = db;
- this.readOptions = readOptions;
this.writeBatchWrapper = writeBatchWrapper;
this.nativeMetricMonitor = nativeMetricMonitor;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
@@ -128,7 +122,6 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
keyGroupId,
keyGroupPrefixBytes,
db,
- readOptions,
columnFamilyHandle,
byteOrderedElementSerializer,
sharedElementOutView,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 53c7537..f7abed6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,7 +51,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -310,7 +309,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;
- try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {
+ try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
iterator.seek(startKeyGroupPrefixBytes);
@@ -377,8 +376,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
@Nonnull
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
- private final ReadOptions readOptions;
-
private RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -389,7 +386,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
this.columnFamilyHandles = columnFamilyHandles;
this.columnFamilyDescriptors = columnFamilyDescriptors;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
- this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
}
@Override
@@ -401,7 +397,6 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor
IOUtils.closeAllQuietly(columnFamilyHandles);
IOUtils.closeQuietly(db);
IOUtils.closeAllQuietly(columnFamilyOptions);
- IOUtils.closeQuietly(readOptions);
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index fd68d70..d402c3d 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -60,7 +60,6 @@ public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends Intern
keyGroupId,
keyGroupPrefixBytes,
rocksDBResource.getRocksDB(),
- rocksDBResource.getReadOptions(),
rocksDBResource.getDefaultColumnFamily(),
TestElementSerializer.INSTANCE,
outputStreamWithPos,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 3b3b697..d25baa7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -101,7 +101,7 @@ public class RocksDBResource extends ExternalResource {
LOG.error("Close previous ColumnOptions's instance failed.", e);
}
- return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose).optimizeForPointLookup(40960);
+ return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions(handlesToClose);
}
});
}
@@ -155,7 +155,7 @@ public class RocksDBResource extends ExternalResource {
PredefinedOptions.DEFAULT.createColumnOptions(handlesToClose), handlesToClose);
this.writeOptions = new WriteOptions();
this.writeOptions.disableWAL();
- this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
+ this.readOptions = new ReadOptions();
this.columnFamilyHandles = new ArrayList<>(1);
this.rocksDB = RocksDB.open(
dbOptions,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 1a4808f..8f71b65 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -126,7 +126,7 @@ public class RocksDBRocksStateKeysIteratorTest {
ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName);
try (
- RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle, keyedStateBackend.getReadOptions());
+ RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(keyedStateBackend.db, handle);
RocksStateKeysIterator<K> iteratorWrapper =
new RocksStateKeysIterator<>(
iterator,
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index cf1fdfa..2167a00 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -59,7 +59,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -122,7 +121,7 @@ public class RocksDBStateBackendConfigTest {
assertArrayEquals(new String[] { testDir1, testDir2 }, rocksDbBackend.getDbStoragePaths());
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
- final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -159,7 +158,7 @@ public class RocksDBStateBackendConfigTest {
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
- RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+ RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();
@@ -169,7 +168,7 @@ public class RocksDBStateBackendConfigTest {
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
- keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+ keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
Assert.assertEquals(
HeapPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
@@ -198,7 +197,7 @@ public class RocksDBStateBackendConfigTest {
final RocksDBStateBackend configuredRocksDBStateBackend = backend.configure(
configFromConfFile,
Thread.currentThread().getContextClassLoader());
- final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
+ final RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(configuredRocksDBStateBackend, env);
// priorityQueueStateType of the job backend should be preserved
assertThat(keyedBackend.getPriorityQueueFactory(), instanceOf(HeapPriorityQueueSetFactory.class));
@@ -255,7 +254,7 @@ public class RocksDBStateBackendConfigTest {
rocksDbBackend.setDbStoragePath(configuredPath);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
- RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
+ RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -726,6 +725,23 @@ public class RocksDBStateBackendConfigTest {
// Utilities
// ------------------------------------------------------------------------
+ static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
+ RocksDBStateBackend rocksDbBackend, Environment env) throws Exception {
+
+ return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(
+ env,
+ env.getJobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ 1,
+ new KeyGroupRange(0, 0),
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT,
+ new UnregisteredMetricsGroup(),
+ Collections.emptyList(),
+ new CloseableRegistry());
+ }
+
static MockEnvironment getMockEnvironment(File... tempDirs) {
final String[] tempDirStrings = new String[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
deleted file mode 100644
index 59a4822..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
-import org.apache.flink.streaming.api.operators.TimerSerializer;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests to cover cases that even user misuse some options, RocksDB state-backend could still work as expected or give explicit feedback.
- *
- * <p>RocksDB state-backend has some internal operations based on RocksDB's APIs which is transparent for users.
- * However, user could still configure options via {@link RocksDBOptionsFactory}, and might lead some operations
- * could not get expected result, e.g. FLINK-17800
- */
-public class RocksDBStateMisuseOptionTest {
-
- @Rule
- public final TemporaryFolder tempFolder = new TemporaryFolder();
-
- /**
- * Tests to cover case when user misuse optimizeForPointLookup with iterator interfaces on map state.
- *
- * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
- */
- @Test
- public void testMisuseOptimizePointLookupWithMapState() throws Exception {
- RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
- RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
- MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
- MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
- keyedStateBackend.setCurrentKey(1);
- Map<Integer, Long> expectedResult = new HashMap<>();
- for (int i = 0; i < 100; i++) {
- long uv = ThreadLocalRandom.current().nextLong();
- mapState.put(i, uv);
- expectedResult.put(i, uv);
- }
-
- Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, Long> entry = iterator.next();
- assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
- iterator.remove();
- }
- assertTrue(expectedResult.isEmpty());
- assertTrue(mapState.isEmpty());
- }
-
- /**
- * Tests to cover case when user misuse optimizeForPointLookup with peek operations on priority queue.
- *
- * <p>The option {@link ColumnFamilyOptions#optimizeForPointLookup(long)} would lead to iterator.seek with prefix bytes invalid.
- */
- @Test
- public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
- RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
- RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
- KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
- keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
-
- PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
- // ensure we insert timers more than cache capacity.
- int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
- List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
- Collections.shuffle(timeStamps);
- for (Integer timeStamp : timeStamps) {
- TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
- priorityQueue.add(timer);
- expectedPriorityQueue.add(timer);
- }
- assertEquals(queueSize, priorityQueue.size());
- TimerHeapInternalTimer<Integer, VoidNamespace> timer;
- while ((timer = priorityQueue.poll()) != null) {
- assertEquals(expectedPriorityQueue.poll(), timer);
- }
- assertTrue(expectedPriorityQueue.isEmpty());
- assertTrue(priorityQueue.isEmpty());
-
- }
-
- private RocksDBStateBackend createStateBackendWithOptimizePointLookup() throws IOException {
- RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI(), true);
- rocksDBStateBackend.setPriorityQueueStateType(RocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
- rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
- return currentOptions;
- }
-
- @Override
- public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
- return currentOptions.optimizeForPointLookup(64);
- }
- });
- return rocksDBStateBackend;
- }
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index cee56aa..ed44a73 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -36,7 +35,6 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDB;
import java.io.File;
-import java.io.IOException;
import java.util.Collections;
/**
@@ -100,23 +98,4 @@ public final class RocksDBTestUtils {
defaultCFHandle,
new CloseableRegistry());
}
-
- public static <K> RocksDBKeyedStateBackend<K> createKeyedStateBackend(
- RocksDBStateBackend rocksDbBackend,
- Environment env,
- TypeSerializer<K> keySerializer) throws IOException {
-
- return (RocksDBKeyedStateBackend<K>) rocksDbBackend.createKeyedStateBackend(
- env,
- env.getJobID(),
- "test_op",
- keySerializer,
- 1,
- new KeyGroupRange(0, 0),
- env.getTaskKvStateRegistry(),
- TtlTimeProvider.DEFAULT,
- new UnregisteredMetricsGroup(),
- Collections.emptyList(),
- new CloseableRegistry());
- }
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 2c73fc7..4447f08 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import java.io.DataOutputStream;
@@ -75,8 +74,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
public void testMergeIterator(int maxParallelism) throws Exception {
Random random = new Random(1234);
- try (ReadOptions readOptions = new ReadOptions();
- RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
+ try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();
@@ -110,7 +108,7 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
int id = 0;
for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
- rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0, readOptions), id));
+ rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBOperationUtils.getRocksIterator(rocksDB, columnFamilyHandle.f0), id));
++id;
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index ce06d1f..96003d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.contrib.streaming.state.benchmark;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBOperationUtils;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
@@ -166,7 +167,7 @@ public class RocksDBPerformanceTest extends TestLogger {
int pos = 0;
- try (final RocksIteratorWrapper iterator = new RocksIteratorWrapper(rocksDB.newIterator())) {
+ try (final RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(rocksDB)) {
// seek to start
unsafe.putInt(keyTemplate, offset, 0);
iterator.seek(keyTemplate);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index e02901b..a6194ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -50,7 +50,7 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>,
*/
private transient int timerHeapIndex;
- public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
+ TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
this.timestamp = timestamp;
this.key = key;
this.namespace = namespace;