You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:16 UTC

[2/4] flink git commit: [FLINK-5576] [queryable state] Improve failure message deserializeList

[FLINK-5576] [queryable state] Improve failure message deserializeList

As in FLINK-5559, wrap the original IOException into a new one with an
appropriate error message to better diagnose it.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe2cf54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe2cf54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe2cf54

Branch: refs/heads/master
Commit: 3fe2cf544d0e51ddc31866943f60bd0938c32f15
Parents: c1c6ef1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:10:54 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:58:08 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 40 ++++++++++++--------
 1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fe2cf54/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 5c60e59..eb37106 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -453,24 +453,32 @@ public final class KvStateRequestSerializer {
 	 */
 	public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
 		if (serializedValue != null) {
-			DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
-			List<T> result = new ArrayList<>();
-			while (in.available() > 0) {
-				result.add(serializer.deserialize(in));
-
-				// The expected binary format has a single byte separator. We
-				// want a consistent binary format in order to not need any
-				// special casing during deserialization. A "cleaner" format
-				// would skip this extra byte, but would require a memory copy
-				// for RocksDB, which stores the data serialized in this way
-				// for lists.
-				if (in.available() > 0) {
-					in.readByte();
+			final DataInputDeserializer in = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+
+			try {
+				final List<T> result = new ArrayList<>();
+				while (in.available() > 0) {
+					result.add(serializer.deserialize(in));
+
+					// The expected binary format has a single byte separator. We
+					// want a consistent binary format in order to not need any
+					// special casing during deserialization. A "cleaner" format
+					// would skip this extra byte, but would require a memory copy
+					// for RocksDB, which stores the data serialized in this way
+					// for lists.
+					if (in.available() > 0) {
+						in.readByte();
+					}
 				}
-			}
 
-			return result;
+				return result;
+			} catch (IOException e) {
+				throw new IOException(
+						"Unable to deserialize value. " +
+							"This indicates a mismatch in the value serializers " +
+							"used by the KvState instance and this access.", e);
+			}
 		} else {
 			return null;
 		}