You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:12 UTC

[flink] 08/14: [FLINK-17694][core] Fix argument check for SimpleVersionedSerialization

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3ba9b38cd750d12db759007235cc7f8497ee4170
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun May 10 19:40:15 2020 +0200

    [FLINK-17694][core] Fix argument check for SimpleVersionedSerialization
---
 .../core/io/SimpleVersionedSerialization.java      |  2 +-
 .../core/io/SimpleVersionedSerializationTest.java  | 49 +++++++++++++---------
 2 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 2c5b68c..92c2bb9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -146,7 +146,7 @@ public class SimpleVersionedSerialization {
 	public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
 		checkNotNull(serializer, "serializer");
 		checkNotNull(bytes, "bytes");
-		checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
+		checkArgument(bytes.length >= 8, "byte array below minimum length (8 bytes)");
 
 		final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
 		final int version =
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 116d37c..d3648e4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -36,26 +36,7 @@ public class SimpleVersionedSerializationTest {
 
 	@Test
 	public void testSerializationRoundTrip() throws IOException {
-		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
-
-			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
-
-			@Override
-			public int getVersion() {
-				return VERSION;
-			}
-
-			@Override
-			public byte[] serialize(String str) throws IOException {
-				return str.getBytes(StandardCharsets.UTF_8);
-			}
-
-			@Override
-			public String deserialize(int version, byte[] serialized) throws IOException {
-				assertEquals(VERSION, version);
-				return new String(serialized, StandardCharsets.UTF_8);
-			}
-		};
+		final SimpleVersionedSerializer<String> utfEncoder = new TestStringSerializer();
 
 		final String testString = "dugfakgs";
 		final DataOutputSerializer out = new DataOutputSerializer(32);
@@ -109,4 +90,32 @@ public class SimpleVersionedSerializationTest {
 		assertEquals(testString, deserialized);
 		assertEquals(testString, deserializedFromBytes);
 	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnderflow() throws Exception {
+		SimpleVersionedSerialization.readVersionAndDeSerialize(new TestStringSerializer(), new byte[7]);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestStringSerializer implements SimpleVersionedSerializer<String> {
+
+		private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public byte[] serialize(String str) throws IOException {
+			return str.getBytes(StandardCharsets.UTF_8);
+		}
+
+		@Override
+		public String deserialize(int version, byte[] serialized) throws IOException {
+			assertEquals(VERSION, version);
+			return new String(serialized, StandardCharsets.UTF_8);
+		}
+	}
 }