You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:18 UTC
[4/4] flink git commit: [FLINK-5559] [queryable state] Throw proper
IOException deserializeKeyAndNamespace
[FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace
This adds the hint that a deserialisation failure probably results from a
"mismatch in the key/namespace serializers used by the KvState instance and this
access" to all thrown exceptions.
This closes #3172.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21742b2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21742b2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21742b2d
Branch: refs/heads/master
Commit: 21742b2d77c0a6bb254f27e954f95efdab009539
Parents: 563c3a4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jan 18 18:31:20 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 12:00:14 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 33 ++++++++--------
.../query/netty/KvStateServerHandlerTest.java | 4 +-
.../message/KvStateRequestSerializerTest.java | 40 ++++++++++++++++++++
3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index eb37106..2f32861 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -363,8 +363,7 @@ public final class KvStateRequestSerializer {
* @param <K> Key type
* @param <N> Namespace
* @return Tuple2 holding deserialized key and namespace
- * @throws IOException Serialization errors are forwarded
- * @throws IllegalStateException If unexpected magic number between key and namespace
+ * @throws IOException if the deserialization fails for any reason
*/
public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
byte[] serializedKeyAndNamespace,
@@ -376,22 +375,24 @@ public final class KvStateRequestSerializer {
0,
serializedKeyAndNamespace.length);
- K key = keySerializer.deserialize(dis);
- byte magicNumber = dis.readByte();
- if (magicNumber != 42) {
- throw new IllegalArgumentException("Unexpected magic number " + magicNumber +
- ". This indicates a mismatch in the key serializers used by the " +
- "KvState instance and this access.");
- }
- N namespace = namespaceSerializer.deserialize(dis);
+ try {
+ K key = keySerializer.deserialize(dis);
+ byte magicNumber = dis.readByte();
+ if (magicNumber != 42) {
+ throw new IOException("Unexpected magic number " + magicNumber + ".");
+ }
+ N namespace = namespaceSerializer.deserialize(dis);
- if (dis.available() > 0) {
- throw new IllegalArgumentException("Unconsumed bytes in the serialized key " +
- "and namespace. This indicates a mismatch in the key/namespace " +
- "serializers used by the KvState instance and this access.");
- }
+ if (dis.available() > 0) {
+ throw new IOException("Unconsumed bytes in the serialized key and namespace.");
+ }
- return new Tuple2<>(key, namespace);
+ return new Tuple2<>(key, namespace);
+ } catch (IOException e) {
+ throw new IOException("Unable to deserialize key " +
+ "and namespace. This indicates a mismatch in the key/namespace " +
+ "serializers used by the KvState instance and this access.", e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index e8caf57..b1ec86f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
assertEquals(182828, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+ assertTrue(response.getCause().getMessage().contains("IOException"));
// Repeat with wrong namespace only
request = KvStateRequestSerializer.serializeKvStateRequest(
@@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
assertEquals(182829, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+ assertTrue(response.getCause().getMessage().contains("IOException"));
assertEquals(2, stats.getNumRequests());
assertEquals(2, stats.getNumFailed());
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9d6d27c..0d9c2e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest {
}
/**
+ * Tests key and namespace deserialization utils with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+ // Long + null String + 1 byte
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+ // Long + null String + 2 bytes
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE);
+ }
+
+ /**
* Tests value serialization utils.
*/
@Test