You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:12 UTC
[flink] 08/14: [FLINK-17694][core] Fix argument check for
SimpleVersionedSerialization
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3ba9b38cd750d12db759007235cc7f8497ee4170
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 19:40:15 2020 +0200
[FLINK-17694][core] Fix argument check for SimpleVersionedSerialization
---
.../core/io/SimpleVersionedSerialization.java | 2 +-
.../core/io/SimpleVersionedSerializationTest.java | 49 +++++++++++++---------
2 files changed, 30 insertions(+), 21 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 2c5b68c..92c2bb9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -146,7 +146,7 @@ public class SimpleVersionedSerialization {
public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(bytes, "bytes");
- checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
+ checkArgument(bytes.length >= 8, "byte array below minimum length (8 bytes)");
final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
final int version =
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 116d37c..d3648e4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -36,26 +36,7 @@ public class SimpleVersionedSerializationTest {
@Test
public void testSerializationRoundTrip() throws IOException {
- final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
-
- private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public byte[] serialize(String str) throws IOException {
- return str.getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public String deserialize(int version, byte[] serialized) throws IOException {
- assertEquals(VERSION, version);
- return new String(serialized, StandardCharsets.UTF_8);
- }
- };
+ final SimpleVersionedSerializer<String> utfEncoder = new TestStringSerializer();
final String testString = "dugfakgs";
final DataOutputSerializer out = new DataOutputSerializer(32);
@@ -109,4 +90,32 @@ public class SimpleVersionedSerializationTest {
assertEquals(testString, deserialized);
assertEquals(testString, deserializedFromBytes);
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnderflow() throws Exception {
+ SimpleVersionedSerialization.readVersionAndDeSerialize(new TestStringSerializer(), new byte[7]);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestStringSerializer implements SimpleVersionedSerializer<String> {
+
+ private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(String str) throws IOException {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ assertEquals(VERSION, version);
+ return new String(serialized, StandardCharsets.UTF_8);
+ }
+ }
}