You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/22 14:55:15 UTC
[1/4] flink git commit: [FLINK-5576] [queryable state] Check
unconsumed bytes in deserializeValue
Repository: flink
Updated Branches:
refs/heads/master d16552dbe -> 21742b2d7
[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue
KvStateRequestSerializer#deserializeValue deserializes a given byte array. This
is used by clients and unit tests and it is fair to assume that these byte arrays
represent a complete value since we do not offer a method to continue reading
from the middle of the array anyway. Therefore, we can treat unconsumed bytes
as errors, e.g. from a wrong serializer being used, and throw a IOException
with an appropriate failure message.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1c6ef1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1c6ef1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1c6ef1e
Branch: refs/heads/master
Commit: c1c6ef1e7346cc5b3dfb6e8b7ae3e782b407b8c7
Parents: d16552d
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:06:00 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:55:44 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 12 +++++--
.../message/KvStateRequestSerializerTest.java | 38 ++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 6c8b4a5..5c60e59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -428,8 +428,16 @@ public final class KvStateRequestSerializer {
if (serializedValue == null) {
return null;
} else {
- DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
- return serializer.deserialize(deser);
+ final DataInputDeserializer deser = new DataInputDeserializer(
+ serializedValue, 0, serializedValue.length);
+ final T value = serializer.deserialize(deser);
+ if (deser.available() > 0) {
+ throw new IOException(
+ "Unconsumed bytes in the deserialized value. " +
+ "This indicates a mismatch in the value serializers " +
+ "used by the KvState instance and this access.");
+ }
+ return value;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a9aa416..9552531 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.junit.Test;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest {
}
/**
+ * Tests value deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueEmpty() throws Exception {
+ KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooShort() throws Exception {
+ // 1 byte (incomplete Long)
+ KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooMany1() throws Exception {
+ // Long + 1 byte
+ KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2},
+ LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests value deserialization with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeValueTooMany2() throws Exception {
+ // Long + 2 bytes
+ KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2},
+ LongSerializer.INSTANCE);
+ }
+
+ /**
* Tests list serialization utils.
*/
@Test
[2/4] flink git commit: [FLINK-5576] [queryable state] Improve
failure message deserializeList
Posted by uc...@apache.org.
[FLINK-5576] [queryable state] Improve failure message deserializeList
As in FLINK-5559, wrap the original IOException into a new one with an
appropriate error message to better diagnose it.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fe2cf54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fe2cf54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fe2cf54
Branch: refs/heads/master
Commit: 3fe2cf544d0e51ddc31866943f60bd0938c32f15
Parents: c1c6ef1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:10:54 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:58:08 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 40 ++++++++++++--------
1 file changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fe2cf54/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 5c60e59..eb37106 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -453,24 +453,32 @@ public final class KvStateRequestSerializer {
*/
public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException {
if (serializedValue != null) {
- DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
- List<T> result = new ArrayList<>();
- while (in.available() > 0) {
- result.add(serializer.deserialize(in));
-
- // The expected binary format has a single byte separator. We
- // want a consistent binary format in order to not need any
- // special casing during deserialization. A "cleaner" format
- // would skip this extra byte, but would require a memory copy
- // for RocksDB, which stores the data serialized in this way
- // for lists.
- if (in.available() > 0) {
- in.readByte();
+ final DataInputDeserializer in = new DataInputDeserializer(
+ serializedValue, 0, serializedValue.length);
+
+ try {
+ final List<T> result = new ArrayList<>();
+ while (in.available() > 0) {
+ result.add(serializer.deserialize(in));
+
+ // The expected binary format has a single byte separator. We
+ // want a consistent binary format in order to not need any
+ // special casing during deserialization. A "cleaner" format
+ // would skip this extra byte, but would require a memory copy
+ // for RocksDB, which stores the data serialized in this way
+ // for lists.
+ if (in.available() > 0) {
+ in.readByte();
+ }
}
- }
- return result;
+ return result;
+ } catch (IOException e) {
+ throw new IOException(
+ "Unable to deserialize value. " +
+ "This indicates a mismatch in the value serializers " +
+ "used by the KvState instance and this access.", e);
+ }
} else {
return null;
}
[3/4] flink git commit: [FLINK-5576] [queryable state] Add tests to
for KvStateRequestSerializerTest
Posted by uc...@apache.org.
[FLINK-5576] [queryable state] Add tests to for KvStateRequestSerializerTest
These tests ensure that some special cases not properly tested before are
handled correctly in future.
This closes #3174.
This closes #3139 (left over).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/563c3a4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/563c3a4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/563c3a4a
Branch: refs/heads/master
Commit: 563c3a4a54535ae6cb3812303812b0d337d62ef2
Parents: 3fe2cf5
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 19 17:12:02 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 11:59:11 2017 +0100
----------------------------------------------------------------------
.../message/KvStateRequestSerializerTest.java | 29 ++++++++++++++++++++
1 file changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/563c3a4a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9552531..9d6d27c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -342,6 +342,35 @@ public class KvStateRequestSerializerTest {
assertEquals(expectedValue, actualValue.get(0).longValue());
}
+ /**
+ * Tests list deserialization with too few bytes.
+ */
+ @Test
+ public void testDeserializeListEmpty() throws Exception {
+ List<Long> actualValue = KvStateRequestSerializer
+ .deserializeList(new byte[] {}, LongSerializer.INSTANCE);
+ assertEquals(0, actualValue.size());
+ }
+
+ /**
+ * Tests list deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeListTooShort1() throws Exception {
+ // 1 byte (incomplete Long)
+ KvStateRequestSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests list deserialization with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testDeserializeListTooShort2() throws Exception {
+ // Long + 1 byte (separator) + 1 byte (incomplete Long)
+ KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3},
+ LongSerializer.INSTANCE);
+ }
+
private byte[] randomByteArray(int capacity) {
byte[] bytes = new byte[capacity];
ThreadLocalRandom.current().nextBytes(bytes);
[4/4] flink git commit: [FLINK-5559] [queryable state] Throw proper
IOException deserializeKeyAndNamespace
Posted by uc...@apache.org.
[FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace
This adds the hint that a deserialisation failure probably results from a
"mismatch in the key/namespace serializers used by the KvState instance and this
access" to all thrown exceptions.
This closes #3172.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21742b2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21742b2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21742b2d
Branch: refs/heads/master
Commit: 21742b2d77c0a6bb254f27e954f95efdab009539
Parents: 563c3a4
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Jan 18 18:31:20 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Sun Jan 22 12:00:14 2017 +0100
----------------------------------------------------------------------
.../netty/message/KvStateRequestSerializer.java | 33 ++++++++--------
.../query/netty/KvStateServerHandlerTest.java | 4 +-
.../message/KvStateRequestSerializerTest.java | 40 ++++++++++++++++++++
3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index eb37106..2f32861 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -363,8 +363,7 @@ public final class KvStateRequestSerializer {
* @param <K> Key type
* @param <N> Namespace
* @return Tuple2 holding deserialized key and namespace
- * @throws IOException Serialization errors are forwarded
- * @throws IllegalStateException If unexpected magic number between key and namespace
+ * @throws IOException if the deserialization fails for any reason
*/
public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
byte[] serializedKeyAndNamespace,
@@ -376,22 +375,24 @@ public final class KvStateRequestSerializer {
0,
serializedKeyAndNamespace.length);
- K key = keySerializer.deserialize(dis);
- byte magicNumber = dis.readByte();
- if (magicNumber != 42) {
- throw new IllegalArgumentException("Unexpected magic number " + magicNumber +
- ". This indicates a mismatch in the key serializers used by the " +
- "KvState instance and this access.");
- }
- N namespace = namespaceSerializer.deserialize(dis);
+ try {
+ K key = keySerializer.deserialize(dis);
+ byte magicNumber = dis.readByte();
+ if (magicNumber != 42) {
+ throw new IOException("Unexpected magic number " + magicNumber + ".");
+ }
+ N namespace = namespaceSerializer.deserialize(dis);
- if (dis.available() > 0) {
- throw new IllegalArgumentException("Unconsumed bytes in the serialized key " +
- "and namespace. This indicates a mismatch in the key/namespace " +
- "serializers used by the KvState instance and this access.");
- }
+ if (dis.available() > 0) {
+ throw new IOException("Unconsumed bytes in the serialized key and namespace.");
+ }
- return new Tuple2<>(key, namespace);
+ return new Tuple2<>(key, namespace);
+ } catch (IOException e) {
+ throw new IOException("Unable to deserialize key " +
+ "and namespace. This indicates a mismatch in the key/namespace " +
+ "serializers used by the KvState instance and this access.", e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index e8caf57..b1ec86f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
assertEquals(182828, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+ assertTrue(response.getCause().getMessage().contains("IOException"));
// Repeat with wrong namespace only
request = KvStateRequestSerializer.serializeKvStateRequest(
@@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger {
assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf));
response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
assertEquals(182829, response.getRequestId());
- assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+ assertTrue(response.getCause().getMessage().contains("IOException"));
assertEquals(2, stats.getNumRequests());
assertEquals(2, stats.getNumFailed());
http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9d6d27c..0d9c2e4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest {
}
/**
+ * Tests key and namespace deserialization utils with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too few bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooShort() throws Exception {
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
+ // Long + null String + 1 byte
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE);
+ }
+
+ /**
+ * Tests key and namespace deserialization utils with too many bytes.
+ */
+ @Test(expected = IOException.class)
+ public void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
+ // Long + null String + 2 bytes
+ KvStateRequestSerializer.deserializeKeyAndNamespace(
+ new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE);
+ }
+
+ /**
* Tests value serialization utils.
*/
@Test