You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:16 UTC
[2/4] flink git commit: [FLINK-5576] [queryable state] Improve
failure message deserializeList
[FLINK-5576] [queryable state] Improve failure message deserializeList
As in FLINK-5559, wrap the original IOException into a new one with an
appropriate error message to better diagnose it.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe2cf54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe2cf54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe2cf54
Branch: refs/heads/master
Commit: 3fe2cf544d0e51ddc31866943f60bd0938c32f15
Parents: c1c6ef1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:10:54 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:58:08 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 40 ++++++++++++--------
1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fe2cf54/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 5c60e59..eb37106 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -453,24 +453,32 @@ public final class KvStateRequestSerializer {
*/
public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
if (serializedValue != null) {
- DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
- List<T> result = new ArrayList<>();
- while (in.available() > 0) {
- result.add(serializer.deserialize(in));
-
- // The expected binary format has a single byte separator. We
- // want a consistent binary format in order to not need any
- // special casing during deserialization. A "cleaner" format
- // would skip this extra byte, but would require a memory copy
- // for RocksDB, which stores the data serialized in this way
- // for lists.
- if (in.available() > 0) {
- in.readByte();
+ final DataInputDeserializer in = new DataInputDeserializer(
+ serializedValue, 0, serializedValue.length);
+
+ try {
+ final List<T> result = new ArrayList<>();
+ while (in.available() > 0) {
+ result.add(serializer.deserialize(in));
+
+ // The expected binary format has a single byte separator. We
+ // want a consistent binary format in order to not need any
+ // special casing during deserialization. A "cleaner" format
+ // would skip this extra byte, but would require a memory copy
+ // for RocksDB, which stores the data serialized in this way
+ // for lists.
+ if (in.available() > 0) {
+ in.readByte();
+ }
}
- }
- return result;
+ return result;
+ } catch (IOException e) {
+ throw new IOException(
+ "Unable to deserialize value. " +
+ "This indicates a mismatch in the value serializers " +
+ "used by the KvState instance and this access.", e);
+ }
} else {
return null;
}