You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/14 11:50:12 UTC
[5/5] flink git commit: [FLINK-6504] [checkpoint] Fix synchronization
on materializedSstFiles in RocksDBKeyedStateBackend
[FLINK-6504] [checkpoint] Fix synchronization on materializedSstFiles in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/958773b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/958773b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/958773b7
Branch: refs/heads/master
Commit: 958773b71c52aae94560508f8d4cd894059d4467
Parents: 4745d0c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu May 11 11:59:47 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sun May 14 13:49:50 2017 +0200
----------------------------------------------------------------------
.../contrib/streaming/state/RocksDBKeyedStateBackend.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/958773b7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 1080e59..b9468f7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -828,9 +828,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
void takeSnapshot() throws Exception {
+ assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+
// use the last completed checkpoint as the comparison base.
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
@@ -888,7 +891,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
sstFiles.putAll(newSstFiles);
sstFiles.putAll(oldSstFiles);
- stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+ synchronized (stateBackend.asyncSnapshotLock) {
+ stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+ }
return new RocksDBIncrementalKeyedStateHandle(
stateBackend.operatorIdentifier,