You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:57:24 UTC
[2/4] flink git commit: [FLINK-5576] [queryable state] Check
unconsumed bytes in deserializeValue
[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue
KvStateRequestSerializer#deserializeValue deserializes a given byte array. This
is used by clients and unit tests and it is fair to assume that these byte arrays
represent a complete value since we do not offer a method to continue reading
from the middle of the array anyway. Therefore, we can treat unconsumed bytes
as errors, e.g. from a wrong serializer being used, and throw a IOException
with an appropriate failure message.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef13f48e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef13f48e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef13f48e
Branch: refs/heads/release-1.2
Commit: ef13f48e1e35ed3502a5ef8dd40b76468dd85650
Parents: d8222c1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:06:00 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 15:56:22 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 12 +++++--
.../message/KvStateRequestSerializerTest.java | 38 ++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ef13f48e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 6c8b4a5..5c60e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -428,8 +428,16 @@ public final class KvStateRequestSerializer {
if (serializedValue == null) {
return null;
} else {
- DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
- return serializer.deserialize(deser);
+ final DataInputDeserializer deser = new DataInputDeserializer(
+ serializedValue, 0, serializedValue.length);
+ final T value = serializer.deserialize(deser);
+ if (deser.available() > 0) {
+ throw new IOException(
+ "Unconsumed bytes in the deserialized value. " +
+ "This indicates a mismatch in the value serializers " +
+ "used by the KvState instance and this access.");
+ }
+ return value;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ef13f48e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a9aa416..9552531 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest {
}
/**
+ * Tests value deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueEmpty() throws Exception {
+ KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooShort() throws Exception {
+ // 1 byte (incomplete Long)
+ KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooMany1() throws Exception {
+ // Long + 1 byte
+ KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+ LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooMany2() throws Exception {
+ // Long + 2 bytes
+ KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2},
+ LongSerializer.INSTANCE);
+ }
+
+ /**
* Tests list serialization utils.
*/
@Test