You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/06 10:32:21 UTC

[2/3] flink git commit: [FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.

[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21cf59d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21cf59d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21cf59d5

Branch: refs/heads/master
Commit: 21cf59d5fffdca9e8335e1990c75e0c3cd684212
Parents: f5071d7
Author: sihuazhou <su...@163.com>
Authored: Fri Mar 16 23:07:54 2018 +0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 6 12:32:08 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21cf59d5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
-		private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
+
+		/** The state meta data. */
+		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+		/** The copied column handle. */
+		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 
 		private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		public void takeDBSnapShot() {
 			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-			this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
+
+			this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+				stateBackend.kvStateInformation.values()) {
+				// snapshot meta info
+				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+				// copy column family handle
+				this.copiedColumnFamilyHandles.add(tuple2.f0);
+			}
 			this.snapshot = stateBackend.db.getSnapshot();
 		}
 
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		private void writeKVStateMetaData() throws IOException {
 
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
-				new ArrayList<>(kvStateInformationCopy.size());
-
-			this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
+			this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size());
 
 			int kvStateId = 0;
 
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			readOptions = new ReadOptions();
 			readOptions.setSnapshot(snapshot);
 
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
-				kvStateInformationCopy) {
-
-				metaInfoSnapshots.add(column.f1.snapshot());
+			for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
 
 				kvStateIterators.add(
-					new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+					new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
 
 				++kvStateId;
 			}
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			KeyedBackendSerializationProxy<K> serializationProxy =
 				new KeyedBackendSerializationProxy<>(
 					stateBackend.getKeySerializer(),
-					metaInfoSnapshots,
+					stateMetaInfoSnapshots,
 					!Objects.equals(
 						UncompressedStreamCompressionDecorator.INSTANCE,
 						stateBackend.keyGroupCompressionDecorator));