You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:25 UTC
[flink] 09/09: [hotfix] Fix RocksDB resource handling in
RocksKeyGroupsRocksSingleStateIteratorTest
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f63b9adf268a29c72237dd542f8140e565ecd7b3
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Feb 4 17:27:13 2021 +0100
[hotfix] Fix RocksDB resource handling in RocksKeyGroupsRocksSingleStateIteratorTest
---
.../state/RocksKeyGroupsRocksSingleStateIteratorTest.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 586794c..6361abc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -118,20 +118,21 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
totalKeysExpected += numKeys;
}
+ CloseableRegistry closeableRegistry = new CloseableRegistry();
int id = 0;
for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle :
columnFamilyHandlesWithKeyCount) {
- rocksIteratorsWithKVStateId.add(
- new Tuple2<>(
- RocksDBOperationUtils.getRocksIterator(
- rocksDB, columnFamilyHandle.f0, readOptions),
- id));
+ RocksIteratorWrapper rocksIterator =
+ RocksDBOperationUtils.getRocksIterator(
+ rocksDB, columnFamilyHandle.f0, readOptions);
+ closeableRegistry.registerCloseable(rocksIterator);
+ rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksIterator, id));
++id;
}
try (RocksStatesPerKeyGroupMergeIterator mergeIterator =
new RocksStatesPerKeyGroupMergeIterator(
- new CloseableRegistry(),
+ closeableRegistry,
rocksIteratorsWithKVStateId,
maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {