You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:40 UTC

flink git commit: [hotfix] [core] Align serialization methods in SimpleVersionedSerialization

Repository: flink
Updated Branches:
  refs/heads/release-1.6 f19337a7d -> 1ee705afa


[hotfix] [core] Align serialization methods in SimpleVersionedSerialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ee705af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ee705af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ee705af

Branch: refs/heads/release-1.6
Commit: 1ee705afa7122e1b92fd3f1b12fe4c97c66ffd5b
Parents: f19337a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 20 17:24:52 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 20 18:56:28 2018 +0200

----------------------------------------------------------------------
 .../core/io/SimpleVersionedSerialization.java   | 33 ++++++--
 .../io/SimpleVersionedSerializationTest.java    | 81 ++++----------------
 2 files changed, 42 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 8bead11..2c5b68c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -110,7 +110,7 @@ public class SimpleVersionedSerialization {
 		checkNotNull(datum, "datum");
 
 		final byte[] data = serializer.serialize(datum);
-		final byte[] versionAndData = new byte[data.length + 4];
+		final byte[] versionAndData = new byte[data.length + 8];
 
 		final int version = serializer.getVersion();
 		versionAndData[0] = (byte) (version >> 24);
@@ -118,8 +118,14 @@ public class SimpleVersionedSerialization {
 		versionAndData[2] = (byte) (version >>  8);
 		versionAndData[3] = (byte)  version;
 
+		final int length = data.length;
+		versionAndData[4] = (byte) (length >> 24);
+		versionAndData[5] = (byte) (length >> 16);
+		versionAndData[6] = (byte) (length >>  8);
+		versionAndData[7] = (byte)  length;
+
 		// move the data to the array
-		System.arraycopy(data, 0, versionAndData, 4, data.length);
+		System.arraycopy(data, 0, versionAndData, 8, data.length);
 
 		return versionAndData;
 	}
@@ -142,14 +148,25 @@ public class SimpleVersionedSerialization {
 		checkNotNull(bytes, "bytes");
 		checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
 
-		final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length);
+		final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
 		final int version =
 				((bytes[0] & 0xff) << 24) |
-						((bytes[1] & 0xff) << 16) |
-						((bytes[2] & 0xff) <<  8) |
-						(bytes[3] & 0xff);
-
-		return serializer.deserialize(version, dataOnly);
+				((bytes[1] & 0xff) << 16) |
+				((bytes[2] & 0xff) <<  8) |
+				(bytes[3] & 0xff);
+
+		final int length =
+				((bytes[4] & 0xff) << 24) |
+				((bytes[5] & 0xff) << 16) |
+				((bytes[6] & 0xff) <<  8) |
+				(bytes[7] & 0xff);
+
+		if (length == dataOnly.length) {
+			return serializer.deserialize(version, dataOnly);
+		}
+		else {
+			throw new IOException("Corrupt data, conflicting lengths. Length fields: " + length + ", data: " + dataOnly.length);
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 89a6b27..116d37c 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -26,8 +26,8 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 
 /**
  * Tests for the {@link SimpleVersionedSerialization} class.
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertNotNull;
 public class SimpleVersionedSerializationTest {
 
 	@Test
-	public void testStreamSerializationRoundTrip() throws IOException {
+	public void testSerializationRoundTrip() throws IOException {
 		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
 
 			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
@@ -60,14 +60,20 @@ public class SimpleVersionedSerializationTest {
 		final String testString = "dugfakgs";
 		final DataOutputSerializer out = new DataOutputSerializer(32);
 		SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out);
+		final byte[] outBytes = out.getCopyOfBuffer();
+
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+		assertArrayEquals(bytes, outBytes);
 
-		final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		final DataInputDeserializer in = new DataInputDeserializer(bytes);
 		final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
+		final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, outBytes);
 		assertEquals(testString, deserialized);
+		assertEquals(testString, deserializedFromBytes);
 	}
 
 	@Test
-	public void testStreamSerializeEmpty() throws IOException {
+	public void testSerializeEmpty() throws IOException {
 		final String testString = "beeeep!";
 
 		SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
@@ -92,68 +98,15 @@ public class SimpleVersionedSerializationTest {
 
 		final DataOutputSerializer out = new DataOutputSerializer(32);
 		SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out);
+		final byte[] outBytes = out.getCopyOfBuffer();
 
-		final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
-		assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
-	}
-
-	@Test
-	public void testSerializationRoundTrip() throws IOException {
-		final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
-
-			private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
-
-			@Override
-			public int getVersion() {
-				return VERSION;
-			}
-
-			@Override
-			public byte[] serialize(String str) throws IOException {
-				return str.getBytes(StandardCharsets.UTF_8);
-			}
-
-			@Override
-			public String deserialize(int version, byte[] serialized) throws IOException {
-				assertEquals(VERSION, version);
-				return new String(serialized, StandardCharsets.UTF_8);
-			}
-		};
-
-		final String testString = "dugfakgs";
-		byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
+		assertArrayEquals(bytes, outBytes);
 
-		final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
+		final DataInputDeserializer in = new DataInputDeserializer(bytes);
+		final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in);
+		final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, outBytes);
 		assertEquals(testString, deserialized);
-	}
-
-	@Test
-	public void testSerializeEmpty() throws IOException {
-		final String testString = "beeeep!";
-
-		SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
-
-			@Override
-			public int getVersion() {
-				return 42;
-			}
-
-			@Override
-			public byte[] serialize(String obj) throws IOException {
-				return new byte[0];
-			}
-
-			@Override
-			public String deserialize(int version, byte[] serialized) throws IOException {
-				assertEquals(42, version);
-				assertEquals(0, serialized.length);
-				return testString;
-			}
-		};
-
-		byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
-		assertNotNull(serialized);
-
-		assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized));
+		assertEquals(testString, deserializedFromBytes);
 	}
 }