You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/08/25 08:36:34 UTC

[2/2] flink git commit: [FLINK-4471] Use user code classloader to deserialize state descriptor in RocksDb backend

[FLINK-4471] Use user code classloader to deserialize state descriptor in RocksDb backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/016c381c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/016c381c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/016c381c

Branch: refs/heads/release-1.1
Commit: 016c381cdb5a9d7fdcb03f0d95da1582f6b68783
Parents: a0ffa38
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Aug 24 12:02:51 2016 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Aug 24 18:04:34 2016 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBStateBackend.java       | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/016c381c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 2c9a5d2..deba9f9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.HDFSCopyFromLocal;
 import org.apache.flink.streaming.util.HDFSCopyToLocal;
-
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.rocksdb.BackupEngine;
@@ -580,7 +580,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		for (int i = 0; i < numColumns; i++) {
 			byte mappingByte = inputView.readByte();
 
-			ObjectInputStream ooIn = new ObjectInputStream(new DataInputViewStream(inputView));
+			ObjectInputStream ooIn = new InstantiationUtil.ClassLoaderObjectInputStream(new DataInputViewStream(inputView), userCodeClassLoader);
 			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
 
 			columnFamilyMapping.put(mappingByte, stateDescriptor);