You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 12:10:37 UTC

[14/14] flink git commit: [FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend

[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7d79d8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7d79d8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7d79d8f

Branch: refs/heads/release-1.3
Commit: f7d79d8fd9e6b2691c75af1214666cc99b5aaca7
Parents: b30b8ee
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 11 11:59:47 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 14:07:26 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7d79d8f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1080e59..b9468f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		void takeSnapshot() throws Exception {
+			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+
 			// use the last completed checkpoint as the comparison base.
 			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 
+
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
 					: stateBackend.kvStateInformation.entrySet()) {
@@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			sstFiles.putAll(newSstFiles);
 			sstFiles.putAll(oldSstFiles);
 
-			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+			synchronized (stateBackend.asyncSnapshotLock) {
+				stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+			}
 
 			return new RocksDBIncrementalKeyedStateHandle(
 				stateBackend.operatorIdentifier,