You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:25 UTC

[flink] 09/09: [hotfix] Fix RocksDB resource handling in RocksKeyGroupsRocksSingleStateIteratorTest

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f63b9adf268a29c72237dd542f8140e565ecd7b3
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Feb 4 17:27:13 2021 +0100

    [hotfix] Fix RocksDB resource handling in RocksKeyGroupsRocksSingleStateIteratorTest
---
 .../state/RocksKeyGroupsRocksSingleStateIteratorTest.java   | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
index 586794c..6361abc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksKeyGroupsRocksSingleStateIteratorTest.java
@@ -118,20 +118,21 @@ public class RocksKeyGroupsRocksSingleStateIteratorTest {
                 totalKeysExpected += numKeys;
             }
 
+            CloseableRegistry closeableRegistry = new CloseableRegistry();
             int id = 0;
             for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle :
                     columnFamilyHandlesWithKeyCount) {
-                rocksIteratorsWithKVStateId.add(
-                        new Tuple2<>(
-                                RocksDBOperationUtils.getRocksIterator(
-                                        rocksDB, columnFamilyHandle.f0, readOptions),
-                                id));
+                RocksIteratorWrapper rocksIterator =
+                        RocksDBOperationUtils.getRocksIterator(
+                                rocksDB, columnFamilyHandle.f0, readOptions);
+                closeableRegistry.registerCloseable(rocksIterator);
+                rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksIterator, id));
                 ++id;
             }
 
             try (RocksStatesPerKeyGroupMergeIterator mergeIterator =
                     new RocksStatesPerKeyGroupMergeIterator(
-                            new CloseableRegistry(),
+                            closeableRegistry,
                             rocksIteratorsWithKVStateId,
                             maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {