You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 11:50:12 UTC

[5/5] flink git commit: [FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend

[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/958773b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/958773b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/958773b7

Branch: refs/heads/master
Commit: 958773b71c52aae94560508f8d4cd894059d4467
Parents: 4745d0c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 11 11:59:47 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/958773b7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1080e59..b9468f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		void takeSnapshot() throws Exception {
+			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+
 			// use the last completed checkpoint as the comparison base.
 			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {
@@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			sstFiles.putAll(newSstFiles);
 			sstFiles.putAll(oldSstFiles);
 
-			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+			synchronized (stateBackend.asyncSnapshotLock) {
+				stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+			}
 
 			return new RocksDBIncrementalKeyedStateHandle(
 				stateBackend.operatorIdentifier,