You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:34:26 UTC

[3/3] flink git commit: [hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.

[hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.

This closes #5705.

(cherry picked from commit 7baf764)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/764bafdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/764bafdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/764bafdd

Branch: refs/heads/release-1.5
Commit: 764bafdd44b8c996c7d6e9902c28baff35188b98
Parents: cbad9cf
Author: sihuazhou <su...@163.com>
Authored: Tue Mar 27 18:43:55 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:33:50 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 16 +++--
 .../streaming/state/RocksDBMapState.java        | 71 ++++++++++----------
 2 files changed, 47 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/764bafdd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 31b9d99..3000667 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1313,10 +1313,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private static final List<Comparator<MergeIterator>> COMPARATORS;
 
 		static {
-			int maxBytes = 4;
+			int maxBytes = 2;
 			COMPARATORS = new ArrayList<>(maxBytes);
 			for (int i = 0; i < maxBytes; ++i) {
-				final int currentBytes = i;
+				final int currentBytes = i + 1;
 				COMPARATORS.add(new Comparator<MergeIterator>() {
 					@Override
 					public int compare(MergeIterator o1, MergeIterator o2) {
@@ -1330,9 +1330,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
 			Preconditions.checkNotNull(kvStateIterators);
+			Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
 			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
 
-			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
+			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
 
 			if (kvStateIterators.size() > 0) {
 				PriorityQueue<MergeIterator> iteratorPriorityQueue =
@@ -1837,10 +1839,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
 
-		/** The state meta data. */
+		/**
+		 * The state meta data.
+		 */
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
 
-		/** The copied column handle. */
+		/**
+		 * The copied column handle.
+		 */
 		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
 
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;

http://git-wip-us.apache.org/repos/asf/flink/blob/764bafdd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index c75a2ed..baa90fa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -540,52 +540,53 @@ public class RocksDBMapState<K, N, UK, UV>
 				return;
 			}
 
-			RocksIterator iterator = db.newIterator(columnFamily);
-
-			/*
-			 * The iteration starts from the prefix bytes at the first loading. The cache then is
-			 * reloaded when the next entry to return is the last one in the cache. At that time,
-			 * we will start the iterating from the last returned entry.
- 			 */
-			RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-			byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
-
-			cacheEntries.clear();
-			cacheIndex = 0;
-
-			iterator.seek(startBytes);
-
-			/*
-			 * If the last returned entry is not deleted, it will be the first entry in the
-			 * iterating. Skip it to avoid redundant access in such cases.
-			 */
-			if (lastEntry != null && !lastEntry.deleted) {
-				iterator.next();
-			}
-
-			while (true) {
-				if (!iterator.isValid() || !underSameKey(iterator.key())) {
-					expired = true;
-					break;
+			// use try-with-resources to ensure RocksIterator can be release even some runtime exception
+			// occurred in the below code block.
+			try (RocksIterator iterator = db.newIterator(columnFamily)) {
+
+				/*
+				 * The iteration starts from the prefix bytes at the first loading. The cache then is
+				 * reloaded when the next entry to return is the last one in the cache. At that time,
+				 * we will start the iterating from the last returned entry.
+ 				 */
+				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+
+				cacheEntries.clear();
+				cacheIndex = 0;
+
+				iterator.seek(startBytes);
+
+				/*
+				 * If the last returned entry is not deleted, it will be the first entry in the
+				 * iterating. Skip it to avoid redundant access in such cases.
+				 */
+				if (lastEntry != null && !lastEntry.deleted) {
+					iterator.next();
 				}
 
-				if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
-					break;
-				}
+				while (true) {
+					if (!iterator.isValid() || !underSameKey(iterator.key())) {
+						expired = true;
+						break;
+					}
 
-				RocksDBMapEntry entry = new RocksDBMapEntry(
+					if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
+						break;
+					}
+
+					RocksDBMapEntry entry = new RocksDBMapEntry(
 						db,
 						iterator.key(),
 						iterator.value(),
 						keySerializer,
 						valueSerializer);
 
-				cacheEntries.add(entry);
+					cacheEntries.add(entry);
 
-				iterator.next();
+					iterator.next();
+				}
 			}
-
-			iterator.close();
 		}
 
 		private boolean underSameKey(byte[] rawKeyBytes) {