You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:04 UTC

[5/5] flink git commit: [FLINK-5283] Fix closing streams when restoring old savepoint in keyed backends

[FLINK-5283] Fix closing streams when restoring old savepoint in keyed backends


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f4ea78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f4ea78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f4ea78

Branch: refs/heads/master
Commit: 35f4ea787c55eceede5154fc1ff23c70cdc522b4
Parents: bf2874e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 7 21:23:35 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java    | 8 +++++---
 .../flink/runtime/state/heap/HeapKeyedStateBackend.java      | 6 ++++--
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8637f6b..5fef5e5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1090,8 +1090,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
-		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates =
-				InstantiationUtil.deserializeObject(restoreState.iterator().next().openInputStream(), userCodeClassLoader);
+		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates;
+		try (FSDataInputStream inputStream = restoreState.iterator().next().openInputStream()) {
+			namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+		}
 
 		Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
 		DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
@@ -1101,7 +1103,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		// first get the column family mapping
 		int numColumns = inputView.readInt();
-		Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+		Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
 		for (int i = 0; i < numColumns; i++) {
 			byte mappingByte = inputView.readByte();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aab2ee5..6e85b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -378,8 +378,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here.");
 
-		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
-				InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader);
+		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates;
+		try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream()) {
+			namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+		}
 
 		for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {