You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:34:26 UTC
[3/3] flink git commit: [hotfix] Use try-with-resources to ensure
RocksIterator is always closed in RocksDBMapState.
[hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.
This closes #5705.
(cherry picked from commit 7baf764)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/764bafdd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/764bafdd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/764bafdd
Branch: refs/heads/release-1.5
Commit: 764bafdd44b8c996c7d6e9902c28baff35188b98
Parents: cbad9cf
Author: sihuazhou <su...@163.com>
Authored: Tue Mar 27 18:43:55 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:33:50 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 16 +++--
.../streaming/state/RocksDBMapState.java | 71 ++++++++++----------
2 files changed, 47 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/764bafdd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 31b9d99..3000667 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1313,10 +1313,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final List<Comparator<MergeIterator>> COMPARATORS;
static {
- int maxBytes = 4;
+ int maxBytes = 2;
COMPARATORS = new ArrayList<>(maxBytes);
for (int i = 0; i < maxBytes; ++i) {
- final int currentBytes = i;
+ final int currentBytes = i + 1;
COMPARATORS.add(new Comparator<MergeIterator>() {
@Override
public int compare(MergeIterator o1, MergeIterator o2) {
@@ -1330,9 +1330,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
Preconditions.checkNotNull(kvStateIterators);
+ Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
- Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
+ Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
if (kvStateIterators.size() > 0) {
PriorityQueue<MergeIterator> iteratorPriorityQueue =
@@ -1837,10 +1839,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Snapshot snapshot;
private ReadOptions readOptions;
- /** The state meta data. */
+ /**
+ * The state meta data.
+ */
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
- /** The copied column handle. */
+ /**
+ * The copied column handle.
+ */
private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
http://git-wip-us.apache.org/repos/asf/flink/blob/764bafdd/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index c75a2ed..baa90fa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -540,52 +540,53 @@ public class RocksDBMapState<K, N, UK, UV>
return;
}
- RocksIterator iterator = db.newIterator(columnFamily);
-
- /*
- * The iteration starts from the prefix bytes at the first loading. The cache then is
- * reloaded when the next entry to return is the last one in the cache. At that time,
- * we will start the iterating from the last returned entry.
- */
- RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
- byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
-
- cacheEntries.clear();
- cacheIndex = 0;
-
- iterator.seek(startBytes);
-
- /*
- * If the last returned entry is not deleted, it will be the first entry in the
- * iterating. Skip it to avoid redundant access in such cases.
- */
- if (lastEntry != null && !lastEntry.deleted) {
- iterator.next();
- }
-
- while (true) {
- if (!iterator.isValid() || !underSameKey(iterator.key())) {
- expired = true;
- break;
+ // use try-with-resources to ensure RocksIterator can be release even some runtime exception
+ // occurred in the below code block.
+ try (RocksIterator iterator = db.newIterator(columnFamily)) {
+
+ /*
+ * The iteration starts from the prefix bytes at the first loading. The cache then is
+ * reloaded when the next entry to return is the last one in the cache. At that time,
+ * we will start the iterating from the last returned entry.
+ */
+ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+ byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+
+ cacheEntries.clear();
+ cacheIndex = 0;
+
+ iterator.seek(startBytes);
+
+ /*
+ * If the last returned entry is not deleted, it will be the first entry in the
+ * iterating. Skip it to avoid redundant access in such cases.
+ */
+ if (lastEntry != null && !lastEntry.deleted) {
+ iterator.next();
}
- if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
- break;
- }
+ while (true) {
+ if (!iterator.isValid() || !underSameKey(iterator.key())) {
+ expired = true;
+ break;
+ }
- RocksDBMapEntry entry = new RocksDBMapEntry(
+ if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
+ break;
+ }
+
+ RocksDBMapEntry entry = new RocksDBMapEntry(
db,
iterator.key(),
iterator.value(),
keySerializer,
valueSerializer);
- cacheEntries.add(entry);
+ cacheEntries.add(entry);
- iterator.next();
+ iterator.next();
+ }
}
-
- iterator.close();
}
private boolean underSameKey(byte[] rawKeyBytes) {