You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:32:20 UTC
[1/3] flink git commit: [hotfix] Use try-with-resources to ensure
RocksIterator is always closed in RocksDBMapState.
Repository: flink
Updated Branches:
refs/heads/master 2d872447d -> 7baf7649e
[hotfix] Use try-with-resources to ensure RocksIterator is always closed in RocksDBMapState.
This closes #5705.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7baf7649
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7baf7649
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7baf7649
Branch: refs/heads/master
Commit: 7baf7649e9ecd485d6f036b7755c2f98cca74e3a
Parents: 21cf59d
Author: sihuazhou <su...@163.com>
Authored: Wed Mar 28 00:43:55 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 16 +++--
.../streaming/state/RocksDBMapState.java | 71 ++++++++++----------
2 files changed, 47 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 31b9d99..3000667 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1313,10 +1313,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final List<Comparator<MergeIterator>> COMPARATORS;
static {
- int maxBytes = 4;
+ int maxBytes = 2;
COMPARATORS = new ArrayList<>(maxBytes);
for (int i = 0; i < maxBytes; ++i) {
- final int currentBytes = i;
+ final int currentBytes = i + 1;
COMPARATORS.add(new Comparator<MergeIterator>() {
@Override
public int compare(MergeIterator o1, MergeIterator o2) {
@@ -1330,9 +1330,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
Preconditions.checkNotNull(kvStateIterators);
+ Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
- Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
+ Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
if (kvStateIterators.size() > 0) {
PriorityQueue<MergeIterator> iteratorPriorityQueue =
@@ -1837,10 +1839,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Snapshot snapshot;
private ReadOptions readOptions;
- /** The state meta data. */
+ /**
+ * The state meta data.
+ */
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
- /** The copied column handle. */
+ /**
+ * The copied column handle.
+ */
private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
http://git-wip-us.apache.org/repos/asf/flink/blob/7baf7649/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index c75a2ed..baa90fa 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -540,52 +540,53 @@ public class RocksDBMapState<K, N, UK, UV>
return;
}
- RocksIterator iterator = db.newIterator(columnFamily);
-
- /*
- * The iteration starts from the prefix bytes at the first loading. The cache then is
- * reloaded when the next entry to return is the last one in the cache. At that time,
- * we will start the iterating from the last returned entry.
- */
- RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
- byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
-
- cacheEntries.clear();
- cacheIndex = 0;
-
- iterator.seek(startBytes);
-
- /*
- * If the last returned entry is not deleted, it will be the first entry in the
- * iterating. Skip it to avoid redundant access in such cases.
- */
- if (lastEntry != null && !lastEntry.deleted) {
- iterator.next();
- }
-
- while (true) {
- if (!iterator.isValid() || !underSameKey(iterator.key())) {
- expired = true;
- break;
+ // use try-with-resources to ensure RocksIterator can be release even some runtime exception
+ // occurred in the below code block.
+ try (RocksIterator iterator = db.newIterator(columnFamily)) {
+
+ /*
+ * The iteration starts from the prefix bytes at the first loading. The cache then is
+ * reloaded when the next entry to return is the last one in the cache. At that time,
+ * we will start the iterating from the last returned entry.
+ */
+ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
+ byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+
+ cacheEntries.clear();
+ cacheIndex = 0;
+
+ iterator.seek(startBytes);
+
+ /*
+ * If the last returned entry is not deleted, it will be the first entry in the
+ * iterating. Skip it to avoid redundant access in such cases.
+ */
+ if (lastEntry != null && !lastEntry.deleted) {
+ iterator.next();
}
- if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
- break;
- }
+ while (true) {
+ if (!iterator.isValid() || !underSameKey(iterator.key())) {
+ expired = true;
+ break;
+ }
- RocksDBMapEntry entry = new RocksDBMapEntry(
+ if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
+ break;
+ }
+
+ RocksDBMapEntry entry = new RocksDBMapEntry(
db,
iterator.key(),
iterator.value(),
keySerializer,
valueSerializer);
- cacheEntries.add(entry);
+ cacheEntries.add(entry);
- iterator.next();
+ iterator.next();
+ }
}
-
- iterator.close();
}
private boolean underSameKey(byte[] rawKeyBytes) {
[2/3] flink git commit: [FLINK-8699][state] Deep copy state info to
avoid potential concurrency problem in full checkpoint.
Posted by sr...@apache.org.
[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21cf59d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21cf59d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21cf59d5
Branch: refs/heads/master
Commit: 21cf59d5fffdca9e8335e1990c75e0c3cd684212
Parents: f5071d7
Author: sihuazhou <su...@163.com>
Authored: Fri Mar 16 23:07:54 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 36 +++++++++++++-------
1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21cf59d5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Snapshot snapshot;
private ReadOptions readOptions;
- private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
+
+ /** The state meta data. */
+ private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+ /** The copied column handle. */
+ private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
public void takeDBSnapShot() {
Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
- this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
+
+ this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+ this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+ for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+ stateBackend.kvStateInformation.values()) {
+ // snapshot meta info
+ this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+ // copy column family handle
+ this.copiedColumnFamilyHandles.add(tuple2.f0);
+ }
this.snapshot = stateBackend.db.getSnapshot();
}
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void writeKVStateMetaData() throws IOException {
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
- new ArrayList<>(kvStateInformationCopy.size());
-
- this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
+ this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size());
int kvStateId = 0;
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
- kvStateInformationCopy) {
-
- metaInfoSnapshots.add(column.f1.snapshot());
+ for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
kvStateIterators.add(
- new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+ new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
++kvStateId;
}
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
stateBackend.getKeySerializer(),
- metaInfoSnapshots,
+ stateMetaInfoSnapshots,
!Objects.equals(
UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));
[3/3] flink git commit: [FLINK-8968][state] Pull the creation of
readOptions out of loop to avoid native resource leak.
Posted by sr...@apache.org.
[FLINK-8968][state] Pull the creation of readOptions out of loop to avoid native resource leak.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5071d7a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5071d7a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5071d7a
Branch: refs/heads/master
Commit: f5071d7a6128f78e7f0ab7ccd8813d30543fb3a2
Parents: 2d87244
Author: sihuazhou <su...@163.com>
Authored: Thu Mar 15 23:57:11 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200
----------------------------------------------------------------------
.../contrib/streaming/state/RocksDBKeyedStateBackend.java | 9 +++++----
.../streaming/state/RocksDBStateBackendConfigTest.java | 8 ++++++--
2 files changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f5071d7a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6a23181..cdeb608 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1952,15 +1952,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
int kvStateId = 0;
+
+ //retrieve iterator for this k/v states
+ readOptions = new ReadOptions();
+ readOptions.setSnapshot(snapshot);
+
for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
kvStateInformationCopy) {
metaInfoSnapshots.add(column.f1.snapshot());
- //retrieve iterator for this k/v states
- readOptions = new ReadOptions();
- readOptions.setSnapshot(snapshot);
-
kvStateIterators.add(
new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
http://git-wip-us.apache.org/repos/asf/flink/blob/f5071d7a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 2dd67f5..65d5b2e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -299,7 +299,9 @@ public class RocksDBStateBackendConfigTest {
});
assertNotNull(rocksDbBackend.getOptions());
- assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
+ try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
+ assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle());
+ }
}
@Test
@@ -324,7 +326,9 @@ public class RocksDBStateBackendConfigTest {
assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
assertNotNull(rocksDbBackend.getOptions());
- assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
+ try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) {
+ assertEquals(CompactionStyle.UNIVERSAL, colCreated.compactionStyle());
+ }
}
@Test