You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:40 UTC
flink git commit: [hotfix] [core] Align serialization methods in
SimpleVersionedSerialization
Repository: flink
Updated Branches:
refs/heads/release-1.6 f19337a7d -> 1ee705afa
[hotfix] [core] Align serialization methods in SimpleVersionedSerialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ee705af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ee705af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ee705af
Branch: refs/heads/release-1.6
Commit: 1ee705afa7122e1b92fd3f1b12fe4c97c66ffd5b
Parents: f19337a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 20 17:24:52 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 20 18:56:28 2018 +0200
----------------------------------------------------------------------
.../core/io/SimpleVersionedSerialization.java | 33 ++++++--
.../io/SimpleVersionedSerializationTest.java | 81 ++++----------------
2 files changed, 42 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
index 8bead11..2c5b68c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -110,7 +110,7 @@ public class SimpleVersionedSerialization {
checkNotNull(datum, "datum");
final byte[] data = serializer.serialize(datum);
- final byte[] versionAndData = new byte[data.length + 4];
+ final byte[] versionAndData = new byte[data.length + 8];
final int version = serializer.getVersion();
versionAndData[0] = (byte) (version >> 24);
@@ -118,8 +118,14 @@ public class SimpleVersionedSerialization {
versionAndData[2] = (byte) (version >> 8);
versionAndData[3] = (byte) version;
+ final int length = data.length;
+ versionAndData[4] = (byte) (length >> 24);
+ versionAndData[5] = (byte) (length >> 16);
+ versionAndData[6] = (byte) (length >> 8);
+ versionAndData[7] = (byte) length;
+
// move the data to the array
- System.arraycopy(data, 0, versionAndData, 4, data.length);
+ System.arraycopy(data, 0, versionAndData, 8, data.length);
return versionAndData;
}
@@ -142,14 +148,25 @@ public class SimpleVersionedSerialization {
checkNotNull(bytes, "bytes");
checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
- final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length);
+ final byte[] dataOnly = Arrays.copyOfRange(bytes, 8, bytes.length);
final int version =
((bytes[0] & 0xff) << 24) |
- ((bytes[1] & 0xff) << 16) |
- ((bytes[2] & 0xff) << 8) |
- (bytes[3] & 0xff);
-
- return serializer.deserialize(version, dataOnly);
+ ((bytes[1] & 0xff) << 16) |
+ ((bytes[2] & 0xff) << 8) |
+ (bytes[3] & 0xff);
+
+ final int length =
+ ((bytes[4] & 0xff) << 24) |
+ ((bytes[5] & 0xff) << 16) |
+ ((bytes[6] & 0xff) << 8) |
+ (bytes[7] & 0xff);
+
+ if (length == dataOnly.length) {
+ return serializer.deserialize(version, dataOnly);
+ }
+ else {
+ throw new IOException("Corrupt data, conflicting lengths. Length fields: " + length + ", data: " + dataOnly.length);
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1ee705af/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
index 89a6b27..116d37c 100644
--- a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -26,8 +26,8 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
/**
* Tests for the {@link SimpleVersionedSerialization} class.
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertNotNull;
public class SimpleVersionedSerializationTest {
@Test
- public void testStreamSerializationRoundTrip() throws IOException {
+ public void testSerializationRoundTrip() throws IOException {
final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
@@ -60,14 +60,20 @@ public class SimpleVersionedSerializationTest {
final String testString = "dugfakgs";
final DataOutputSerializer out = new DataOutputSerializer(32);
SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out);
+ final byte[] outBytes = out.getCopyOfBuffer();
+
+ final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+ assertArrayEquals(bytes, outBytes);
- final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ final DataInputDeserializer in = new DataInputDeserializer(bytes);
final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
+ final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, outBytes);
assertEquals(testString, deserialized);
+ assertEquals(testString, deserializedFromBytes);
}
@Test
- public void testStreamSerializeEmpty() throws IOException {
+ public void testSerializeEmpty() throws IOException {
final String testString = "beeeep!";
SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
@@ -92,68 +98,15 @@ public class SimpleVersionedSerializationTest {
final DataOutputSerializer out = new DataOutputSerializer(32);
SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out);
+ final byte[] outBytes = out.getCopyOfBuffer();
- final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
- assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
- }
-
- @Test
- public void testSerializationRoundTrip() throws IOException {
- final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
-
- private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public byte[] serialize(String str) throws IOException {
- return str.getBytes(StandardCharsets.UTF_8);
- }
-
- @Override
- public String deserialize(int version, byte[] serialized) throws IOException {
- assertEquals(VERSION, version);
- return new String(serialized, StandardCharsets.UTF_8);
- }
- };
-
- final String testString = "dugfakgs";
- byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+ final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
+ assertArrayEquals(bytes, outBytes);
- final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
+ final DataInputDeserializer in = new DataInputDeserializer(bytes);
+ final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in);
+ final String deserializedFromBytes = SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, outBytes);
assertEquals(testString, deserialized);
- }
-
- @Test
- public void testSerializeEmpty() throws IOException {
- final String testString = "beeeep!";
-
- SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
-
- @Override
- public int getVersion() {
- return 42;
- }
-
- @Override
- public byte[] serialize(String obj) throws IOException {
- return new byte[0];
- }
-
- @Override
- public String deserialize(int version, byte[] serialized) throws IOException {
- assertEquals(42, version);
- assertEquals(0, serialized.length);
- return testString;
- }
- };
-
- byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
- assertNotNull(serialized);
-
- assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized));
+ assertEquals(testString, deserializedFromBytes);
}
}