You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:18 UTC

[4/4] flink git commit: [FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace

[FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace

This adds the hint that a deserialisation failure probably results from a
"mismatch in the key/namespace serializers used by the KvState instance and this
access" to all thrown exceptions.

This closes #3172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21742b2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21742b2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21742b2d

Branch: refs/heads/master
Commit: 21742b2d77c0a6bb254f27e954f95efdab009539
Parents: 563c3a4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jan 18 18:31:20 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 12:00:14 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 33 ++++++++--------
 .../query/netty/KvStateServerHandlerTest.java   |  4 +-
 .../message/KvStateRequestSerializerTest.java   | 40 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index eb37106..2f32861 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -363,8 +363,7 @@ public final class KvStateRequestSerializer {
 	 * @param <K>                       Key type
 	 * @param <N>                       Namespace
 	 * @return Tuple2 holding deserialized key and namespace
-	 * @throws IOException           Serialization errors are forwarded
-	 * @throws IllegalStateException If unexpected magic number between key and namespace
+	 * @throws IOException              if the deserialization fails for any reason
 	 */
 	public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
 			byte[] serializedKeyAndNamespace,
@@ -376,22 +375,24 @@ public final class KvStateRequestSerializer {
 				0,
 				serializedKeyAndNamespace.length);
 
-		K key = keySerializer.deserialize(dis);
-		byte magicNumber = dis.readByte();
-		if (magicNumber != 42) {
-			throw new IllegalArgumentException("Unexpected magic number " + magicNumber +
-					". This indicates a mismatch in the key serializers used by the " +
-					"KvState instance and this access.");
-		}
-		N namespace = namespaceSerializer.deserialize(dis);
+		try {
+			K key = keySerializer.deserialize(dis);
+			byte magicNumber = dis.readByte();
+			if (magicNumber != 42) {
+				throw new IOException("Unexpected magic number " + magicNumber + ".");
+			}
+			N namespace = namespaceSerializer.deserialize(dis);
 
-		if (dis.available() > 0) {
-			throw new IllegalArgumentException("Unconsumed bytes in the serialized key " +
-					"and namespace. This indicates a mismatch in the key/namespace " +
-					"serializers used by the KvState instance and this access.");
-		}
+			if (dis.available() > 0) {
+				throw new IOException("Unconsumed bytes in the serialized key and namespace.");
+			}
 
-		return new Tuple2<>(key, namespace);
+			return new Tuple2<>(key, namespace);
+		} catch (IOException e) {
+			throw new IOException("Unable to deserialize key " +
+				"and namespace. This indicates a mismatch in the key/namespace " +
+				"serializers used by the KvState instance and this access.", e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index e8caf57..b1ec86f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
 		KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
 		assertEquals(182828, response.getRequestId());
-		assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		// Repeat with wrong namespace only
 		request = KvStateRequestSerializer.serializeKvStateRequest(
@@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
 		response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
 		assertEquals(182829, response.getRequestId());
-		assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		assertEquals(2, stats.getNumRequests());
 		assertEquals(2, stats.getNumFailed());

http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9d6d27c..0d9c2e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest {
 	}
 
 	/**
+	 * Tests key and namespace deserialization utils with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+		// Long + null String + 1 byte
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE,
+			StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+		// Long + null String + 2 bytes
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE,
+			StringSerializer.INSTANCE);
+	}
+
+	/**
 	 * Tests value serialization utils.
 	 */
 	@Test