You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:15 UTC

[1/4] flink git commit: [FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue

Repository: flink
Updated Branches:
  refs/heads/master d16552dbe -> 21742b2d7


[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue

KvStateRequestSerializer#deserializeValue deserializes a given byte array. This
is used by clients and unit tests and it is fair to assume that these byte arrays
represent a complete value since we do not offer a method to continue reading
from the middle of the array anyway. Therefore, we can treat unconsumed bytes
as errors, e.g. from a wrong serializer being used, and throw a IOException
with an appropriate failure message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1c6ef1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1c6ef1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1c6ef1e

Branch: refs/heads/master
Commit: c1c6ef1e7346cc5b3dfb6e8b7ae3e782b407b8c7
Parents: d16552d
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:06:00 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:55:44 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 12 +++++--
 .../message/KvStateRequestSerializerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 6c8b4a5..5c60e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -428,8 +428,16 @@ public final class KvStateRequestSerializer {
 		if (serializedValue == null) {
 			return null;
 		} else {
-			DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-			return serializer.deserialize(deser);
+			final DataInputDeserializer deser = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+			final T value = serializer.deserialize(deser);
+			if (deser.available() > 0) {
+				throw new IOException(
+					"Unconsumed bytes in the deserialized value. " +
+						"This indicates a mismatch in the value serializers " +
+						"used by the KvState instance and this access.");
+			}
+			return value;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a9aa416..9552531 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest {
 	}
 
 	/**
+	 * Tests value deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueEmpty() throws Exception {
+		KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooShort() throws Exception {
+		// 1 byte (incomplete Long)
+		KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooMany1() throws Exception {
+		// Long + 1 byte
+		KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+			LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooMany2() throws Exception {
+		// Long + 2 bytes
+		KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2},
+			LongSerializer.INSTANCE);
+	}
+
+	/**
 	 * Tests list serialization utils.
 	 */
 	@Test


[2/4] flink git commit: [FLINK-5576] [queryable state] Improve failure message deserializeList

Posted by uc...@apache.org.
[FLINK-5576] [queryable state] Improve failure message deserializeList

As in FLINK-5559, wrap the original IOException into a new one with an
appropriate error message to better diagnose it.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe2cf54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe2cf54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe2cf54

Branch: refs/heads/master
Commit: 3fe2cf544d0e51ddc31866943f60bd0938c32f15
Parents: c1c6ef1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:10:54 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:58:08 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 40 ++++++++++++--------
 1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fe2cf54/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 5c60e59..eb37106 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -453,24 +453,32 @@ public final class KvStateRequestSerializer {
 	 */
 	public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
 		if (serializedValue != null) {
-			DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
-			List<T> result = new ArrayList<>();
-			while (in.available() > 0) {
-				result.add(serializer.deserialize(in));
-
-				// The expected binary format has a single byte separator. We
-				// want a consistent binary format in order to not need any
-				// special casing during deserialization. A "cleaner" format
-				// would skip this extra byte, but would require a memory copy
-				// for RocksDB, which stores the data serialized in this way
-				// for lists.
-				if (in.available() > 0) {
-					in.readByte();
+			final DataInputDeserializer in = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+
+			try {
+				final List<T> result = new ArrayList<>();
+				while (in.available() > 0) {
+					result.add(serializer.deserialize(in));
+
+					// The expected binary format has a single byte separator. We
+					// want a consistent binary format in order to not need any
+					// special casing during deserialization. A "cleaner" format
+					// would skip this extra byte, but would require a memory copy
+					// for RocksDB, which stores the data serialized in this way
+					// for lists.
+					if (in.available() > 0) {
+						in.readByte();
+					}
 				}
-			}
 
-			return result;
+				return result;
+			} catch (IOException e) {
+				throw new IOException(
+						"Unable to deserialize value. " +
+							"This indicates a mismatch in the value serializers " +
+							"used by the KvState instance and this access.", e);
+			}
 		} else {
 			return null;
 		}


[3/4] flink git commit: [FLINK-5576] [queryable state] Add tests to for KvStateRequestSerializerTest

Posted by uc...@apache.org.
[FLINK-5576] [queryable state] Add tests to for KvStateRequestSerializerTest

These tests ensure that some special cases not properly tested before are
handled correctly in future.

This closes #3174.
This closes #3139 (left over).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563c3a4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563c3a4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563c3a4a

Branch: refs/heads/master
Commit: 563c3a4a54535ae6cb3812303812b0d337d62ef2
Parents: 3fe2cf5
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:12:02 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:59:11 2017 +0100

----------------------------------------------------------------------
 .../message/KvStateRequestSerializerTest.java   | 29 ++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/563c3a4a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9552531..9d6d27c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -342,6 +342,35 @@ public class KvStateRequestSerializerTest {
 		assertEquals(expectedValue, actualValue.get(0).longValue());
 	}
 
+	/**
+	 * Tests list deserialization with too few bytes.
+	 */
+	@Test
+	public void testDeserializeListEmpty() throws Exception {
+		List<Long> actualValue = KvStateRequestSerializer
+			.deserializeList(new byte[] {}, LongSerializer.INSTANCE);
+		assertEquals(0, actualValue.size());
+	}
+
+	/**
+	 * Tests list deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeListTooShort1() throws Exception {
+		// 1 byte (incomplete Long)
+		KvStateRequestSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests list deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeListTooShort2() throws Exception {
+		// Long + 1 byte (separator) + 1 byte (incomplete Long)
+		KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3},
+			LongSerializer.INSTANCE);
+	}
+
 	private byte[] randomByteArray(int capacity) {
 		byte[] bytes = new byte[capacity];
 		ThreadLocalRandom.current().nextBytes(bytes);


[4/4] flink git commit: [FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace

Posted by uc...@apache.org.
[FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace

This adds the hint that a deserialisation failure probably results from a
"mismatch in the key/namespace serializers used by the KvState instance and this
access" to all thrown exceptions.

This closes #3172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21742b2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21742b2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21742b2d

Branch: refs/heads/master
Commit: 21742b2d77c0a6bb254f27e954f95efdab009539
Parents: 563c3a4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jan 18 18:31:20 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 12:00:14 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 33 ++++++++--------
 .../query/netty/KvStateServerHandlerTest.java   |  4 +-
 .../message/KvStateRequestSerializerTest.java   | 40 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index eb37106..2f32861 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -363,8 +363,7 @@ public final class KvStateRequestSerializer {
 	 * @param <K>                       Key type
 	 * @param <N>                       Namespace
 	 * @return Tuple2 holding deserialized key and namespace
-	 * @throws IOException           Serialization errors are forwarded
-	 * @throws IllegalStateException If unexpected magic number between key and namespace
+	 * @throws IOException              if the deserialization fails for any reason
 	 */
 	public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
 			byte[] serializedKeyAndNamespace,
@@ -376,22 +375,24 @@ public final class KvStateRequestSerializer {
 				0,
 				serializedKeyAndNamespace.length);
 
-		K key = keySerializer.deserialize(dis);
-		byte magicNumber = dis.readByte();
-		if (magicNumber != 42) {
-			throw new IllegalArgumentException("Unexpected magic number " + magicNumber +
-					". This indicates a mismatch in the key serializers used by the " +
-					"KvState instance and this access.");
-		}
-		N namespace = namespaceSerializer.deserialize(dis);
+		try {
+			K key = keySerializer.deserialize(dis);
+			byte magicNumber = dis.readByte();
+			if (magicNumber != 42) {
+				throw new IOException("Unexpected magic number " + magicNumber + ".");
+			}
+			N namespace = namespaceSerializer.deserialize(dis);
 
-		if (dis.available() > 0) {
-			throw new IllegalArgumentException("Unconsumed bytes in the serialized key " +
-					"and namespace. This indicates a mismatch in the key/namespace " +
-					"serializers used by the KvState instance and this access.");
-		}
+			if (dis.available() > 0) {
+				throw new IOException("Unconsumed bytes in the serialized key and namespace.");
+			}
 
-		return new Tuple2<>(key, namespace);
+			return new Tuple2<>(key, namespace);
+		} catch (IOException e) {
+			throw new IOException("Unable to deserialize key " +
+				"and namespace. This indicates a mismatch in the key/namespace " +
+				"serializers used by the KvState instance and this access.", e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index e8caf57..b1ec86f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
 		KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
 		assertEquals(182828, response.getRequestId());
-		assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		// Repeat with wrong namespace only
 		request = KvStateRequestSerializer.serializeKvStateRequest(
@@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 		assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
 		response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
 		assertEquals(182829, response.getRequestId());
-		assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+		assertTrue(response.getCause().getMessage().contains("IOException"));
 
 		assertEquals(2, stats.getNumRequests());
 		assertEquals(2, stats.getNumFailed());

http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9d6d27c..0d9c2e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest {
 	}
 
 	/**
+	 * Tests key and namespace deserialization utils with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+		// Long + null String + 1 byte
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE,
+			StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests key and namespace deserialization utils with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+		// Long + null String + 2 bytes
+		KvStateRequestSerializer.deserializeKeyAndNamespace(
+			new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE,
+			StringSerializer.INSTANCE);
+	}
+
+	/**
 	 * Tests value serialization utils.
 	 */
 	@Test