You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:57:24 UTC

[2/4] flink git commit: [FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue

[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue

KvStateRequestSerializer#deserializeValue deserializes a given byte array. This
is used by clients and unit tests and it is fair to assume that these byte arrays
represent a complete value since we do not offer a method to continue reading
from the middle of the array anyway. Therefore, we can treat unconsumed bytes
as errors, e.g. from a wrong serializer being used, and throw a IOException
with an appropriate failure message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef13f48e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef13f48e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef13f48e

Branch: refs/heads/release-1.2
Commit: ef13f48e1e35ed3502a5ef8dd40b76468dd85650
Parents: d8222c1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:06:00 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 15:56:22 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 12 +++++--
 .../message/KvStateRequestSerializerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef13f48e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 6c8b4a5..5c60e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -428,8 +428,16 @@ public final class KvStateRequestSerializer {
 		if (serializedValue == null) {
 			return null;
 		} else {
-			DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-			return serializer.deserialize(deser);
+			final DataInputDeserializer deser = new DataInputDeserializer(
+				serializedValue, 0, serializedValue.length);
+			final T value = serializer.deserialize(deser);
+			if (deser.available() > 0) {
+				throw new IOException(
+					"Unconsumed bytes in the deserialized value. " +
+						"This indicates a mismatch in the value serializers " +
+						"used by the KvState instance and this access.");
+			}
+			return value;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ef13f48e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a9aa416..9552531 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest {
 	}
 
 	/**
+	 * Tests value deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueEmpty() throws Exception {
+		KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too few bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooShort() throws Exception {
+		// 1 byte (incomplete Long)
+		KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooMany1() throws Exception {
+		// Long + 1 byte
+		KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+			LongSerializer.INSTANCE);
+	}
+
+	/**
+	 * Tests value deserialization with too many bytes.
+	 */
+	@Test(expected = IOException.class)
+	public void testDeserializeValueTooMany2() throws Exception {
+		// Long + 2 bytes
+		KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2},
+			LongSerializer.INSTANCE);
+	}
+
+	/**
 	 * Tests list serialization utils.
 	 */
 	@Test