You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/26 14:34:03 UTC

[flink] 02/03: [hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend disposed at RocksDBStateMisuseOptionTest

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 11d45135d85937edd16fb4f8f94ba71f5f794626
Author: Yun Tang <my...@live.com>
AuthorDate: Mon Jun 22 16:27:02 2020 +0800

    [hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend disposed at RocksDBStateMisuseOptionTest
    
    This closes #12736.
---
 .../state/RocksDBStateMisuseOptionTest.java        | 86 ++++++++++++----------
 1 file changed, 48 insertions(+), 38 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
index 59a4822..20e2906 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -71,26 +71,31 @@ public class RocksDBStateMisuseOptionTest {
 	@Test
 	public void testMisuseOptimizePointLookupWithMapState() throws Exception {
 		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
-		MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
-		keyedStateBackend.setCurrentKey(1);
-		Map<Integer, Long> expectedResult = new HashMap<>();
-		for (int i = 0; i < 100; i++) {
-			long uv = ThreadLocalRandom.current().nextLong();
-			mapState.put(i, uv);
-			expectedResult.put(i, uv);
-		}
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+			createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		try {
+			MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+			MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+			keyedStateBackend.setCurrentKey(1);
+			Map<Integer, Long> expectedResult = new HashMap<>();
+			for (int i = 0; i < 100; i++) {
+				long uv = ThreadLocalRandom.current().nextLong();
+				mapState.put(i, uv);
+				expectedResult.put(i, uv);
+			}
 
-		Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
-		while (iterator.hasNext()) {
-			Map.Entry<Integer, Long> entry = iterator.next();
-			assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
-			iterator.remove();
+			Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator();
+			while (iterator.hasNext()) {
+				Map.Entry<Integer, Long> entry = iterator.next();
+				assertEquals(entry.getValue(), expectedResult.remove(entry.getKey()));
+				iterator.remove();
+			}
+			assertTrue(expectedResult.isEmpty());
+			assertTrue(mapState.isEmpty());
+		} finally {
+			keyedStateBackend.dispose();
 		}
-		assertTrue(expectedResult.isEmpty());
-		assertTrue(mapState.isEmpty());
 	}
 
 	/**
@@ -101,27 +106,32 @@ public class RocksDBStateMisuseOptionTest {
 	@Test
 	public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException {
 		RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup();
-		RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-		KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
-			keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
-
-		PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
-		// ensure we insert timers more than cache capacity.
-		int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
-		List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
-		Collections.shuffle(timeStamps);
-		for (Integer timeStamp : timeStamps) {
-			TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
-			priorityQueue.add(timer);
-			expectedPriorityQueue.add(timer);
-		}
-		assertEquals(queueSize, priorityQueue.size());
-		TimerHeapInternalTimer<Integer, VoidNamespace> timer;
-		while ((timer = priorityQueue.poll()) != null) {
-			assertEquals(expectedPriorityQueue.poll(), timer);
+		RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+			createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+		try {
+			KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue =
+				keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE));
+
+			PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp()));
+			// ensure we insert timers more than cache capacity.
+			int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+			List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList());
+			Collections.shuffle(timeStamps);
+			for (Integer timeStamp : timeStamps) {
+				TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
+				priorityQueue.add(timer);
+				expectedPriorityQueue.add(timer);
+			}
+			assertEquals(queueSize, priorityQueue.size());
+			TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+			while ((timer = priorityQueue.poll()) != null) {
+				assertEquals(expectedPriorityQueue.poll(), timer);
+			}
+			assertTrue(expectedPriorityQueue.isEmpty());
+			assertTrue(priorityQueue.isEmpty());
+		} finally {
+			keyedStateBackend.dispose();
 		}
-		assertTrue(expectedPriorityQueue.isEmpty());
-		assertTrue(priorityQueue.isEmpty());
 
 	}