You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/26 14:34:03 UTC
[flink] 02/03: [hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend
disposed at RocksDBStateMisuseOptionTest
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 11d45135d85937edd16fb4f8f94ba71f5f794626
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Jun 22 16:27:02 2020 +0800
[hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend disposed at RocksDBStateMisuseOptionTest
This closes #12736.
---
.../state/RocksDBStateMisuseOptionTest.java | 86 ++++++++++++----------
1 file changed, 48 insertions(+), 38 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
index 59a4822..20e2906 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -71,26 +71,31 @@ public class RocksDBStateMisuseOptionTest {
@Test
public void testMisuseOptimizePointLookupWithMapState() throws Exception {
RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
- RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
- MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
- MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
- keyedStateBackend.setCurrentKey(1);
- Map<Integer, Long> expectedResult = new HashMap<>();
- for (int i = 0; i < 100; i++) {
- long uv = ThreadLocalRandom.current().nextLong();
- mapState.put(i, uv);
- expectedResult.put(i, uv);
- }
+ RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+ createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+ try {
+ MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+ MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+ keyedStateBackend.setCurrentKey(1);
+ Map<Integer, Long> expectedResult = new HashMap<>();
+ for (int i = 0; i < 100; i++) {
+ long uv = ThreadLocalRandom.current().nextLong();
+ mapState.put(i, uv);
+ expectedResult.put(i, uv);
+ }
- Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, Long> entry = iterator.next();
- assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
- iterator.remove();
+ Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Long> entry = iterator.next();
+ assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
+ iterator.remove();
+ }
+ assertTrue(expectedResult.isEmpty());
+ assertTrue(mapState.isEmpty());
+ } finally {
+ keyedStateBackend.dispose();
}
- assertTrue(expectedResult.isEmpty());
- assertTrue(mapState.isEmpty());
}
/**
@@ -101,27 +106,32 @@ public class RocksDBStateMisuseOptionTest {
@Test
public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
- RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
- KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
- keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
-
- PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
- // ensure we insert timers more than cache capacity.
- int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
- List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
- Collections.shuffle(timeStamps);
- for (Integer timeStamp : timeStamps) {
- TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
- priorityQueue.add(timer);
- expectedPriorityQueue.add(timer);
- }
- assertEquals(queueSize, priorityQueue.size());
- TimerHeapInternalTimer<Integer, VoidNamespace> timer;
- while ((timer = priorityQueue.poll()) != null) {
- assertEquals(expectedPriorityQueue.poll(), timer);
+ RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+ createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+ try {
+ KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
+ keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
+
+ PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
+ // ensure we insert timers more than cache capacity.
+ int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+ List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
+ Collections.shuffle(timeStamps);
+ for (Integer timeStamp : timeStamps) {
+ TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+ priorityQueue.add(timer);
+ expectedPriorityQueue.add(timer);
+ }
+ assertEquals(queueSize, priorityQueue.size());
+ TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+ while ((timer = priorityQueue.poll()) != null) {
+ assertEquals(expectedPriorityQueue.poll(), timer);
+ }
+ assertTrue(expectedPriorityQueue.isEmpty());
+ assertTrue(priorityQueue.isEmpty());
+ } finally {
+ keyedStateBackend.dispose();
}
- assertTrue(expectedPriorityQueue.isEmpty());
- assertTrue(priorityQueue.isEmpty());
}