You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:32:21 UTC
[2/3] flink git commit: [FLINK-8699][state] Deep copy state info to
avoid potential concurrency problem in full checkpoint.
[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21cf59d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21cf59d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21cf59d5
Branch: refs/heads/master
Commit: 21cf59d5fffdca9e8335e1990c75e0c3cd684212
Parents: f5071d7
Author: sihuazhou <su...@163.com>
Authored: Fri Mar 16 23:07:54 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 36 +++++++++++++-------
1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21cf59d5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private Snapshot snapshot;
private ReadOptions readOptions;
- private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
+
+ /** The state meta data. */
+ private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+ /** The copied column handle. */
+ private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
public void takeDBSnapShot() {
Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
- this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
+
+ this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+ this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+ for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+ stateBackend.kvStateInformation.values()) {
+ // snapshot meta info
+ this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+ // copy column family handle
+ this.copiedColumnFamilyHandles.add(tuple2.f0);
+ }
this.snapshot = stateBackend.db.getSnapshot();
}
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void writeKVStateMetaData() throws IOException {
- List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
- new ArrayList<>(kvStateInformationCopy.size());
-
- this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
+ this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size());
int kvStateId = 0;
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);
- for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
- kvStateInformationCopy) {
-
- metaInfoSnapshots.add(column.f1.snapshot());
+ for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
kvStateIterators.add(
- new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+ new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
++kvStateId;
}
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyedBackendSerializationProxy<K> serializationProxy =
new KeyedBackendSerializationProxy<>(
stateBackend.getKeySerializer(),
- metaInfoSnapshots,
+ stateMetaInfoSnapshots,
!Objects.equals(
UncompressedStreamCompressionDecorator.INSTANCE,
stateBackend.keyGroupCompressionDecorator));