You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/02 21:22:32 UTC
[3/3] flink git commit: [FLINK-7968] [core] Move DataOutputSerializer
and DataInputDeserializer to 'flink-core'
[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core'
These core flink utils are independent of any other runtime classes and
are also used both in flink-runtime and in flink-queryable-state (which duplicated
the code).
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37df826e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37df826e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37df826e
Branch: refs/heads/master
Commit: 37df826e4e1355a2cba89a85ea94257d40785cb2
Parents: 198b74a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 2 18:27:23 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 2 19:20:06 2017 +0100
----------------------------------------------------------------------
...eInformationKeyValueSerializationSchema.java | 4 +-
.../core/memory/DataInputDeserializer.java | 387 ++++++++++++++++++
.../flink/core/memory/DataOutputSerializer.java | 337 ++++++++++++++++
.../core/memory/DataInputDeserializerTest.java | 59 +++
.../memory/DataInputOutputSerializerTest.java | 123 ++++++
.../serialization/types/AsciiStringType.java | 85 ++++
.../serialization/types/BooleanType.java | 74 ++++
.../serialization/types/ByteArrayType.java | 83 ++++
.../serialization/types/ByteSubArrayType.java | 98 +++++
.../testutils/serialization/types/ByteType.java | 74 ++++
.../testutils/serialization/types/CharType.java | 75 ++++
.../serialization/types/DoubleType.java | 75 ++++
.../serialization/types/FloatType.java | 74 ++++
.../testutils/serialization/types/IntType.java | 74 ++++
.../testutils/serialization/types/LongType.java | 74 ++++
.../types/SerializationTestType.java | 32 ++
.../types/SerializationTestTypeFactory.java | 47 +++
.../serialization/types/ShortType.java | 74 ++++
.../serialization/types/UnsignedByteType.java | 74 ++++
.../serialization/types/UnsignedShortType.java | 74 ++++
.../testutils/serialization/types/Util.java | 106 +++++
.../serialization/DataInputDeserializer.java | 392 -------------------
.../serialization/DataOutputSerializer.java | 344 ----------------
.../state/serialization/KvStateSerializer.java | 2 +
.../AdaptiveSpanningRecordDeserializer.java | 4 +-
.../api/serialization/EventSerializer.java | 4 +-
.../serialization/SpanningRecordSerializer.java | 2 +-
...llingAdaptiveSpanningRecordDeserializer.java | 2 +-
.../metrics/dump/MetricDumpSerialization.java | 4 +-
.../runtime/util/DataInputDeserializer.java | 390 ------------------
.../runtime/util/DataOutputSerializer.java | 342 ----------------
.../io/network/api/CheckpointBarrierTest.java | 4 +-
.../api/serialization/PagedViewsTest.java | 6 +-
.../SpanningRecordSerializationTest.java | 6 +-
.../SpanningRecordSerializerTest.java | 6 +-
.../serialization/types/AsciiStringType.java | 85 ----
.../api/serialization/types/BooleanType.java | 74 ----
.../api/serialization/types/ByteArrayType.java | 83 ----
.../serialization/types/ByteSubArrayType.java | 98 -----
.../api/serialization/types/ByteType.java | 74 ----
.../api/serialization/types/CharType.java | 75 ----
.../api/serialization/types/DoubleType.java | 75 ----
.../api/serialization/types/FloatType.java | 74 ----
.../api/serialization/types/IntType.java | 74 ----
.../api/serialization/types/LongType.java | 74 ----
.../types/SerializationTestType.java | 32 --
.../types/SerializationTestTypeFactory.java | 47 ---
.../api/serialization/types/ShortType.java | 74 ----
.../serialization/types/UnsignedByteType.java | 74 ----
.../serialization/types/UnsignedShortType.java | 74 ----
.../network/api/serialization/types/Util.java | 106 -----
.../network/serialization/LargeRecordsTest.java | 4 +-
.../serialization/types/LargeObjectType.java | 2 +-
.../runtime/util/DataInputDeserializerTest.java | 59 ---
.../util/DataInputOutputSerializerTest.java | 122 ------
.../source/SerializedCheckpointData.java | 4 +-
.../TypeInformationSerializationSchema.java | 4 +-
.../StreamElementSerializerTest.java | 4 +-
58 files changed, 2131 insertions(+), 2872 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 3e0cdb5..96b8879 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
new file mode 100644
index 0000000..088f9d2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
+ */
+public class DataInputDeserializer implements DataInputView, java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // ------------------------------------------------------------------------
+
+ private byte[] buffer;
+
+ private int end;
+
+ private int position;
+
+ // ------------------------------------------------------------------------
+
+ public DataInputDeserializer() {}
+
+ public DataInputDeserializer(byte[] buffer) {
+ setBuffer(buffer, 0, buffer.length);
+ }
+
+ public DataInputDeserializer(byte[] buffer, int start, int len) {
+ setBuffer(buffer, start, len);
+ }
+
+ public DataInputDeserializer(ByteBuffer buffer) {
+ setBuffer(buffer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Changing buffers
+ // ------------------------------------------------------------------------
+
+ public void setBuffer(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ this.buffer = buffer.array();
+ this.position = buffer.arrayOffset() + buffer.position();
+ this.end = this.position + buffer.remaining();
+ } else if (buffer.isDirect()) {
+ this.buffer = new byte[buffer.remaining()];
+ this.position = 0;
+ this.end = this.buffer.length;
+
+ buffer.get(this.buffer);
+ } else {
+ throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+ }
+ }
+
+ public void setBuffer(byte[] buffer, int start, int len) {
+ if (buffer == null) {
+ throw new NullPointerException();
+ }
+
+ if (start < 0 || len < 0 || start + len > buffer.length) {
+ throw new IllegalArgumentException();
+ }
+
+ this.buffer = buffer;
+ this.position = start;
+ this.end = start + len;
+ }
+
+ public void releaseArrays() {
+ this.buffer = null;
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Data Input
+ // ----------------------------------------------------------------------------------------
+
+ public int available() {
+ if (position < end) {
+ return end - position;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++] != 0;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (this.position < this.end) {
+ return this.buffer[this.position++];
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ if (this.position < this.end - 1) {
+ return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return Double.longBitsToDouble(readLong());
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return Float.intBitsToFloat(readInt());
+ }
+
+ @Override
+ public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ @Override
+ public void readFully(byte[] b, int off, int len) throws IOException {
+ if (len >= 0) {
+ if (off <= b.length - len) {
+ if (this.position <= this.end - len) {
+ System.arraycopy(this.buffer, position, b, off, len);
+ position += len;
+ } else {
+ throw new EOFException();
+ }
+ } else {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ } else if (len < 0) {
+ throw new IllegalArgumentException("Length may not be negative.");
+ }
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ if (this.position >= 0 && this.position < this.end - 3) {
+ @SuppressWarnings("restriction")
+ int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Integer.reverseBytes(value);
+ }
+
+ this.position += 4;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readLine() throws IOException {
+ if (this.position < this.end) {
+ // read until a newline is found
+ StringBuilder bld = new StringBuilder();
+ char curr = (char) readUnsignedByte();
+ while (position < this.end && curr != '\n') {
+ bld.append(curr);
+ curr = (char) readUnsignedByte();
+ }
+ // trim a trailing carriage return
+ int len = bld.length();
+ if (len > 0 && bld.charAt(len - 1) == '\r') {
+ bld.setLength(len - 1);
+ }
+ String s = bld.toString();
+ bld.setLength(0);
+ return s;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ if (position >= 0 && position < this.end - 7) {
+ @SuppressWarnings("restriction")
+ long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+ if (LITTLE_ENDIAN) {
+ value = Long.reverseBytes(value);
+ }
+ this.position += 8;
+ return value;
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ if (position >= 0 && position < this.end - 1) {
+ return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public String readUTF() throws IOException {
+ int utflen = readUnsignedShort();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int c, char2, char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ if (c > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) c;
+ }
+
+ while (count < utflen) {
+ c = (int) bytearr[count] & 0xff;
+ switch (c >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) c;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+
+ @Override
+ public int readUnsignedByte() throws IOException {
+ if (this.position < this.end) {
+ return (this.buffer[this.position++] & 0xff);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int readUnsignedShort() throws IOException {
+ if (this.position < this.end - 1) {
+ return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
+ } else {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int skipBytes(int n) throws IOException {
+ if (this.position <= this.end - n) {
+ this.position += n;
+ return n;
+ } else {
+ n = this.end - this.position;
+ this.position = this.end;
+ return n;
+ }
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ int skippedBytes = skipBytes(numBytes);
+
+ if (skippedBytes < numBytes){
+ throw new EOFException("Could not skip " + numBytes + " bytes.");
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (b == null){
+ throw new NullPointerException("Byte array b cannot be null.");
+ }
+
+ if (off < 0){
+ throw new IndexOutOfBoundsException("Offset cannot be negative.");
+ }
+
+ if (len < 0){
+ throw new IndexOutOfBoundsException("Length cannot be negative.");
+ }
+
+ if (b.length - off < len){
+ throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+ ".");
+ }
+
+ if (this.position >= this.end) {
+ return -1;
+ } else {
+ int toRead = Math.min(this.end - this.position, len);
+ System.arraycopy(this.buffer, this.position, b, off, toRead);
+ this.position += toRead;
+
+ return toRead;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("restriction")
+ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+ @SuppressWarnings("restriction")
+ private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
new file mode 100644
index 0000000..7b8acb7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+/**
+ * A simple and efficient serializer for the {@link java.io.DataOutput} interface.
+ */
+public class DataOutputSerializer implements DataOutputView {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);
+
+ private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
+
+ // ------------------------------------------------------------------------
+
+ private final byte[] startBuffer;
+
+ private byte[] buffer;
+
+ private int position;
+
+ private ByteBuffer wrapper;
+
+ // ------------------------------------------------------------------------
+
+ public DataOutputSerializer(int startSize) {
+ if (startSize < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ this.startBuffer = new byte[startSize];
+ this.buffer = this.startBuffer;
+ this.wrapper = ByteBuffer.wrap(buffer);
+ }
+
+ public ByteBuffer wrapAsByteBuffer() {
+ this.wrapper.position(0);
+ this.wrapper.limit(this.position);
+ return this.wrapper;
+ }
+
+ public byte[] getByteArray() {
+ return buffer;
+ }
+
+ public byte[] getCopyOfBuffer() {
+ return Arrays.copyOf(buffer, position);
+ }
+
+ public void clear() {
+ this.position = 0;
+ }
+
+ public int length() {
+ return this.position;
+ }
+
+ public void pruneBuffer() {
+ if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes.");
+ }
+
+ this.buffer = this.startBuffer;
+ this.wrapper = ByteBuffer.wrap(this.buffer);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
+ }
+
+ // ----------------------------------------------------------------------------------------
+ // Data Output
+ // ----------------------------------------------------------------------------------------
+
+ @Override
+ public void write(int b) throws IOException {
+ if (this.position >= this.buffer.length) {
+ resize(1);
+ }
+ this.buffer[this.position++] = (byte) (b & 0xff);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ if (this.position > this.buffer.length - len) {
+ resize(len);
+ }
+ System.arraycopy(b, off, this.buffer, this.position, len);
+ this.position += len;
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ write(v ? 1 : 0);
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ write(v);
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ final int sLen = s.length();
+ if (this.position >= this.buffer.length - sLen) {
+ resize(sLen);
+ }
+
+ for (int i = 0; i < sLen; i++) {
+ writeByte(s.charAt(i));
+ }
+ this.position += sLen;
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ if (this.position >= this.buffer.length - 1) {
+ resize(2);
+ }
+ this.buffer[this.position++] = (byte) (v >> 8);
+ this.buffer[this.position++] = (byte) v;
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ final int sLen = s.length();
+ if (this.position >= this.buffer.length - 2 * sLen) {
+ resize(2 * sLen);
+ }
+ for (int i = 0; i < sLen; i++) {
+ writeChar(s.charAt(i));
+ }
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ @SuppressWarnings("restriction")
+ @Override
+ public void writeInt(int v) throws IOException {
+ if (this.position >= this.buffer.length - 3) {
+ resize(4);
+ }
+ if (LITTLE_ENDIAN) {
+ v = Integer.reverseBytes(v);
+ }
+ UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+ this.position += 4;
+ }
+
+ @SuppressWarnings("restriction")
+ @Override
+ public void writeLong(long v) throws IOException {
+ if (this.position >= this.buffer.length - 7) {
+ resize(8);
+ }
+ if (LITTLE_ENDIAN) {
+ v = Long.reverseBytes(v);
+ }
+ UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+ this.position += 8;
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ if (this.position >= this.buffer.length - 1) {
+ resize(2);
+ }
+ this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+ this.buffer[this.position++] = (byte) (v & 0xff);
+ }
+
+ @Override
+ public void writeUTF(String str) throws IOException {
+ int strlen = str.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535) {
+ throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+ }
+ else if (this.position > this.buffer.length - utflen - 2) {
+ resize(utflen + 2);
+ }
+
+ byte[] bytearr = this.buffer;
+ int count = this.position;
+
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) (utflen & 0xFF);
+
+ int i;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ bytearr[count++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ this.position = count;
+ }
+
+ private void resize(int minCapacityAdd) throws IOException {
+ int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
+ byte[] nb;
+ try {
+ nb = new byte[newLen];
+ }
+ catch (NegativeArraySizeException e) {
+ throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
+ }
+ catch (OutOfMemoryError e) {
+ // this was too large to allocate, try the smaller size (if possible)
+ if (newLen > this.buffer.length + minCapacityAdd) {
+ newLen = this.buffer.length + minCapacityAdd;
+ try {
+ nb = new byte[newLen];
+ }
+ catch (OutOfMemoryError ee) {
+ // still not possible. give an informative exception message that reports the size
+ throw new IOException("Failed to serialize element. Serialized size (> "
+ + newLen + " bytes) exceeds JVM heap space", ee);
+ }
+ } else {
+ throw new IOException("Failed to serialize element. Serialized size (> "
+ + newLen + " bytes) exceeds JVM heap space", e);
+ }
+ }
+
+ System.arraycopy(this.buffer, 0, nb, 0, this.position);
+ this.buffer = nb;
+ this.wrapper = ByteBuffer.wrap(this.buffer);
+ }
+
+ @Override
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ if (buffer.length - this.position < numBytes){
+ throw new EOFException("Could not skip " + numBytes + " bytes.");
+ }
+
+ this.position += numBytes;
+ }
+
+ @Override
+ public void write(DataInputView source, int numBytes) throws IOException {
+ if (buffer.length - this.position < numBytes){
+ throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
+ }
+
+ source.readFully(this.buffer, this.position, numBytes);
+ this.position += numBytes;
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("restriction")
+ private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+ @SuppressWarnings("restriction")
+ private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
new file mode 100644
index 0000000..26d407d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test suite for the {@link DataInputDeserializer} class.
+ */
+public class DataInputDeserializerTest {
+
+ @Test
+ public void testAvailable() throws Exception {
+ byte[] bytes;
+ DataInputDeserializer dis;
+
+ bytes = new byte[] {};
+ dis = new DataInputDeserializer(bytes, 0, bytes.length);
+ Assert.assertEquals(bytes.length, dis.available());
+
+ bytes = new byte[] {1, 2, 3};
+ dis = new DataInputDeserializer(bytes, 0, bytes.length);
+ Assert.assertEquals(bytes.length, dis.available());
+
+ dis.readByte();
+ Assert.assertEquals(2, dis.available());
+ dis.readByte();
+ Assert.assertEquals(1, dis.available());
+ dis.readByte();
+ Assert.assertEquals(0, dis.available());
+
+ try {
+ dis.readByte();
+ Assert.fail("Did not throw expected IOException");
+ } catch (IOException e) {
+ // ignore
+ }
+ Assert.assertEquals(0, dis.available());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
new file mode 100644
index 0000000..02a7ea7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+/**
+ * Tests for the combination of {@link DataOutputSerializer} and {@link DataInputDeserializer}.
+ */
+public class DataInputOutputSerializerTest {
+
+ @Test
+ public void testWrapAsByteBuffer() {
+ SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
+
+ DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
+ MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(randomInt.length());
+
+ try {
+ // empty buffer, read buffer should be empty
+ ByteBuffer wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // write to data output, read buffer should still be empty
+ randomInt.write(serializer);
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // get updated read buffer, read buffer should contain written data
+ wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+ // clear data output, read buffer should still contain written data
+ serializer.clear();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+
+ // get updated read buffer, should be empty
+ wrapper = serializer.wrapAsByteBuffer();
+
+ Assert.assertEquals(0, wrapper.position());
+ Assert.assertEquals(0, wrapper.limit());
+
+ // write to data output and read back to memory
+ randomInt.write(serializer);
+ wrapper = serializer.wrapAsByteBuffer();
+
+ segment.put(0, wrapper, randomInt.length());
+
+ Assert.assertEquals(randomInt.length(), wrapper.position());
+ Assert.assertEquals(randomInt.length(), wrapper.limit());
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ @Test
+ public void testRandomValuesWriteRead() {
+ final int numElements = 100000;
+ final ArrayDeque<SerializationTestType> reference = new ArrayDeque<>();
+
+ DataOutputSerializer serializer = new DataOutputSerializer(1);
+
+ for (SerializationTestType value : Util.randomRecords(numElements)) {
+ reference.add(value);
+
+ try {
+ value.write(serializer);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer());
+
+ for (SerializationTestType expected : reference) {
+ try {
+ SerializationTestType actual = expected.getClass().newInstance();
+ actual.read(deserializer);
+
+ Assert.assertEquals(expected, actual);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test encountered an unexpected exception.");
+ }
+ }
+
+ reference.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
new file mode 100644
index 0000000..d845a39
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class AsciiStringType implements SerializationTestType {
+
+ private static final int MAX_LEN = 1500;
+
+ public String value;
+
+ public AsciiStringType() {
+ this.value = "";
+ }
+
+ private AsciiStringType(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public AsciiStringType getRandom(Random rnd) {
+ final StringBuilder bld = new StringBuilder();
+ final int len = rnd.nextInt(MAX_LEN + 1);
+
+ for (int i = 0; i < len; i++) {
+ // 1--127
+ bld.append((char) (rnd.nextInt(126) + 1));
+ }
+
+ return new AsciiStringType(bld.toString());
+ }
+
+ @Override
+ public int length() {
+ return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeUTF(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readUTF();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof AsciiStringType) {
+ AsciiStringType other = (AsciiStringType) obj;
+ return this.value.equals(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
new file mode 100644
index 0000000..cb696b1
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class BooleanType implements SerializationTestType {
+
+ private boolean value;
+
+ public BooleanType() {
+ this.value = false;
+ }
+
+ private BooleanType(boolean value) {
+ this.value = value;
+ }
+
+ @Override
+ public BooleanType getRandom(Random rnd) {
+ return new BooleanType(rnd.nextBoolean());
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeBoolean(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readBoolean();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value ? 1 : 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BooleanType) {
+ BooleanType other = (BooleanType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
new file mode 100644
index 0000000..c33e43d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteArrayType implements SerializationTestType {
+
+ private static final int MAX_LEN = 512 * 15;
+
+ private byte[] data;
+
+ public ByteArrayType() {
+ this.data = new byte[0];
+ }
+
+ public ByteArrayType(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public ByteArrayType getRandom(Random rnd) {
+ final int len = rnd.nextInt(MAX_LEN) + 1;
+ final byte[] data = new byte[len];
+ rnd.nextBytes(data);
+ return new ByteArrayType(data);
+ }
+
+ @Override
+ public int length() {
+ return data.length + 4;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(this.data.length);
+ out.write(this.data);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ final int len = in.readInt();
+ this.data = new byte[len];
+ in.readFully(this.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(this.data);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteArrayType) {
+ ByteArrayType other = (ByteArrayType) obj;
+ return Arrays.equals(this.data, other.data);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
new file mode 100644
index 0000000..2d5a48a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteSubArrayType implements SerializationTestType {
+
+ private static final int MAX_LEN = 512;
+
+ private final byte[] data;
+
+ private int len;
+
+ public ByteSubArrayType() {
+ this.data = new byte[MAX_LEN];
+ this.len = 0;
+ }
+
+ @Override
+ public ByteSubArrayType getRandom(Random rnd) {
+ final int len = rnd.nextInt(MAX_LEN) + 1;
+ final ByteSubArrayType t = new ByteSubArrayType();
+ t.len = len;
+
+ final byte[] data = t.data;
+ for (int i = 0; i < len; i++) {
+ data[i] = (byte) rnd.nextInt(256);
+ }
+
+ return t;
+ }
+
+ @Override
+ public int length() {
+ return len + 4;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(this.len);
+ out.write(this.data, 0, this.len);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.len = in.readInt();
+ in.readFully(this.data, 0, this.len);
+ }
+
+ @Override
+ public int hashCode() {
+ final byte[] copy = new byte[this.len];
+ System.arraycopy(this.data, 0, copy, 0, this.len);
+ return Arrays.hashCode(copy);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteSubArrayType) {
+ ByteSubArrayType other = (ByteSubArrayType) obj;
+ if (this.len == other.len) {
+ for (int i = 0; i < this.len; i++) {
+ if (this.data[i] != other.data[i]) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
new file mode 100644
index 0000000..8b843be
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteType implements SerializationTestType {
+
+ private byte value;
+
+ public ByteType() {
+ this.value = (byte) 0;
+ }
+
+ private ByteType(byte value) {
+ this.value = value;
+ }
+
+ @Override
+ public ByteType getRandom(Random rnd) {
+ return new ByteType((byte) rnd.nextInt(256));
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeByte(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ByteType) {
+ ByteType other = (ByteType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
new file mode 100644
index 0000000..962de7f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class CharType implements SerializationTestType {
+
+ private char value;
+
+ public CharType() {
+ this.value = 0;
+ }
+
+ private CharType(char value) {
+ this.value = value;
+ }
+
+ @Override
+ public CharType getRandom(Random rnd) {
+ return new CharType((char) rnd.nextInt(10000));
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeChar(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readChar();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof CharType) {
+ CharType other = (CharType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
new file mode 100644
index 0000000..7119c34
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class DoubleType implements SerializationTestType {
+
+ private double value;
+
+ public DoubleType() {
+ this.value = 0;
+ }
+
+ private DoubleType(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public DoubleType getRandom(Random rnd) {
+ return new DoubleType(rnd.nextDouble());
+ }
+
+ @Override
+ public int length() {
+ return 8;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeDouble(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readDouble();
+ }
+
+ @Override
+ public int hashCode() {
+ final long l = Double.doubleToLongBits(this.value);
+ return (int) (l ^ l >>> 32);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DoubleType) {
+ DoubleType other = (DoubleType) obj;
+ return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
new file mode 100644
index 0000000..9fa6e63
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class FloatType implements SerializationTestType {
+
+ private float value;
+
+ public FloatType() {
+ this.value = 0;
+ }
+
+ private FloatType(float value) {
+ this.value = value;
+ }
+
+ @Override
+ public FloatType getRandom(Random rnd) {
+ return new FloatType(rnd.nextFloat());
+ }
+
+ @Override
+ public int length() {
+ return 4;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeFloat(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readFloat();
+ }
+
+ @Override
+ public int hashCode() {
+ return Float.floatToIntBits(this.value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FloatType) {
+ FloatType other = (FloatType) obj;
+ return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
new file mode 100644
index 0000000..52313ab
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class IntType implements SerializationTestType {
+
+ private int value;
+
+ public IntType() {
+ this.value = 0;
+ }
+
+ public IntType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public IntType getRandom(Random rnd) {
+ return new IntType(rnd.nextInt());
+ }
+
+ @Override
+ public int length() {
+ return 4;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readInt();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof IntType) {
+ IntType other = (IntType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
new file mode 100644
index 0000000..3e47b4b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class LongType implements SerializationTestType {
+
+ private long value;
+
+ public LongType() {
+ this.value = 0;
+ }
+
+ private LongType(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public LongType getRandom(Random rnd) {
+ return new LongType(rnd.nextLong());
+ }
+
+ @Override
+ public int length() {
+ return 8;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeLong(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readLong();
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (this.value ^ this.value >>> 32);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof LongType) {
+ LongType other = (LongType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
new file mode 100644
index 0000000..1edd796
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.util.Random;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+public interface SerializationTestType extends IOReadableWritable {
+
+ public SerializationTestType getRandom(Random rnd);
+
+ public int length();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
new file mode 100644
index 0000000..392e15a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+public enum SerializationTestTypeFactory {
+
+ BOOLEAN(new BooleanType()),
+ BYTE_ARRAY(new ByteArrayType()),
+ BYTE_SUB_ARRAY(new ByteSubArrayType()),
+ BYTE(new ByteType()),
+ CHAR(new CharType()),
+ DOUBLE(new DoubleType()),
+ FLOAT(new FloatType()),
+ INT(new IntType()),
+ LONG(new LongType()),
+ SHORT(new ShortType()),
+ UNSIGNED_BYTE(new UnsignedByteType()),
+ UNSIGNED_SHORT(new UnsignedShortType()),
+ STRING(new AsciiStringType());
+
+ private final SerializationTestType factory;
+
+ SerializationTestTypeFactory(SerializationTestType type) {
+ this.factory = type;
+ }
+
+ public SerializationTestType factory() {
+ return this.factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
new file mode 100644
index 0000000..b5b3c27
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ShortType implements SerializationTestType {
+
+ private short value;
+
+ public ShortType() {
+ this.value = (short) 0;
+ }
+
+ private ShortType(short value) {
+ this.value = value;
+ }
+
+ @Override
+ public ShortType getRandom(Random rnd) {
+ return new ShortType((short) rnd.nextInt(65536));
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeShort(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readShort();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ShortType) {
+ ShortType other = (ShortType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
new file mode 100644
index 0000000..29b897a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedByteType implements SerializationTestType {
+
+ private int value;
+
+ public UnsignedByteType() {
+ this.value = 0;
+ }
+
+ private UnsignedByteType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public UnsignedByteType getRandom(Random rnd) {
+ return new UnsignedByteType(rnd.nextInt(128) + 128);
+ }
+
+ @Override
+ public int length() {
+ return 1;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeByte(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readUnsignedByte();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnsignedByteType) {
+ UnsignedByteType other = (UnsignedByteType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
new file mode 100644
index 0000000..47d0156
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedShortType implements SerializationTestType {
+
+ private int value;
+
+ public UnsignedShortType() {
+ this.value = 0;
+ }
+
+ private UnsignedShortType(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public UnsignedShortType getRandom(Random rnd) {
+ return new UnsignedShortType(rnd.nextInt(32768) + 32768);
+ }
+
+ @Override
+ public int length() {
+ return 2;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeShort(this.value);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ this.value = in.readUnsignedShort();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UnsignedShortType) {
+ UnsignedShortType other = (UnsignedShortType) obj;
+ return this.value == other.value;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
new file mode 100644
index 0000000..b34701f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+/**
+ * Utility class to help serialization for testing.
+ */
+public final class Util {
+
+ private static final long SEED = 64871654635745873L;
+
+ private static Random random = new Random(SEED);
+
+ public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
+ return type.factory().getRandom(Util.random);
+ }
+
+ public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
+
+ return new MockRecords(numElements) {
+ @Override
+ protected SerializationTestType getRecord() {
+ return type.factory().getRandom(Util.random);
+ }
+ };
+ }
+
+ public static MockRecords randomRecords(final int numElements) {
+
+ return new MockRecords(numElements) {
+ @Override
+ protected SerializationTestType getRecord() {
+ // select random test type factory
+ SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
+ int i = Util.random.nextInt(types.length);
+
+ return types[i].factory().getRandom(Util.random);
+ }
+ };
+ }
+
+ // -----------------------------------------------------------------------------------------------------------------
+ public abstract static class MockRecords implements Iterable<SerializationTestType> {
+
+ private int numRecords;
+
+ public MockRecords(int numRecords) {
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public Iterator<SerializationTestType> iterator() {
+ return new Iterator<SerializationTestType>() {
+ @Override
+ public boolean hasNext() {
+ return numRecords > 0;
+ }
+
+ @Override
+ public SerializationTestType next() {
+ if (numRecords > 0) {
+ numRecords--;
+
+ return getRecord();
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ abstract protected SerializationTestType getRecord();
+ }
+
+ /**
+ * No instantiation.
+ */
+ private Util() {
+ throw new RuntimeException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
deleted file mode 100644
index 878df85..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-/**
- * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
- *
- * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
- */
-public class DataInputDeserializer implements DataInputView, java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // ------------------------------------------------------------------------
-
- private byte[] buffer;
-
- private int end;
-
- private int position;
-
- // ------------------------------------------------------------------------
-
- public DataInputDeserializer() {}
-
- public DataInputDeserializer(byte[] buffer) {
- setBuffer(buffer, 0, buffer.length);
- }
-
- public DataInputDeserializer(byte[] buffer, int start, int len) {
- setBuffer(buffer, start, len);
- }
-
- public DataInputDeserializer(ByteBuffer buffer) {
- setBuffer(buffer);
- }
-
- // ------------------------------------------------------------------------
- // Changing buffers
- // ------------------------------------------------------------------------
-
- public void setBuffer(ByteBuffer buffer) {
- if (buffer.hasArray()) {
- this.buffer = buffer.array();
- this.position = buffer.arrayOffset() + buffer.position();
- this.end = this.position + buffer.remaining();
- } else if (buffer.isDirect()) {
- this.buffer = new byte[buffer.remaining()];
- this.position = 0;
- this.end = this.buffer.length;
-
- buffer.get(this.buffer);
- } else {
- throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
- }
- }
-
- public void setBuffer(byte[] buffer, int start, int len) {
- if (buffer == null) {
- throw new NullPointerException();
- }
-
- if (start < 0 || len < 0 || start + len > buffer.length) {
- throw new IllegalArgumentException();
- }
-
- this.buffer = buffer;
- this.position = start;
- this.end = start + len;
- }
-
- public void releaseArrays() {
- this.buffer = null;
- }
-
- // ----------------------------------------------------------------------------------------
- // Data Input
- // ----------------------------------------------------------------------------------------
-
- public int available() {
- if (position < end) {
- return end - position;
- } else {
- return 0;
- }
- }
-
- @Override
- public boolean readBoolean() throws IOException {
- if (this.position < this.end) {
- return this.buffer[this.position++] != 0;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public byte readByte() throws IOException {
- if (this.position < this.end) {
- return this.buffer[this.position++];
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public char readChar() throws IOException {
- if (this.position < this.end - 1) {
- return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public double readDouble() throws IOException {
- return Double.longBitsToDouble(readLong());
- }
-
- @Override
- public float readFloat() throws IOException {
- return Float.intBitsToFloat(readInt());
- }
-
- @Override
- public void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
- }
-
- @Override
- public void readFully(byte[] b, int off, int len) throws IOException {
- if (len >= 0) {
- if (off <= b.length - len) {
- if (this.position <= this.end - len) {
- System.arraycopy(this.buffer, position, b, off, len);
- position += len;
- } else {
- throw new EOFException();
- }
- } else {
- throw new ArrayIndexOutOfBoundsException();
- }
- } else if (len < 0) {
- throw new IllegalArgumentException("Length may not be negative.");
- }
- }
-
- @Override
- public int readInt() throws IOException {
- if (this.position >= 0 && this.position < this.end - 3) {
- @SuppressWarnings("restriction")
- int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
- if (LITTLE_ENDIAN) {
- value = Integer.reverseBytes(value);
- }
-
- this.position += 4;
- return value;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public String readLine() throws IOException {
- if (this.position < this.end) {
- // read until a newline is found
- StringBuilder bld = new StringBuilder();
- char curr = (char) readUnsignedByte();
- while (position < this.end && curr != '\n') {
- bld.append(curr);
- curr = (char) readUnsignedByte();
- }
- // trim a trailing carriage return
- int len = bld.length();
- if (len > 0 && bld.charAt(len - 1) == '\r') {
- bld.setLength(len - 1);
- }
- String s = bld.toString();
- bld.setLength(0);
- return s;
- } else {
- return null;
- }
- }
-
- @Override
- public long readLong() throws IOException {
- if (position >= 0 && position < this.end - 7) {
- @SuppressWarnings("restriction")
- long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
- if (LITTLE_ENDIAN) {
- value = Long.reverseBytes(value);
- }
- this.position += 8;
- return value;
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public short readShort() throws IOException {
- if (position >= 0 && position < this.end - 1) {
- return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public String readUTF() throws IOException {
- int utflen = readUnsignedShort();
- byte[] bytearr = new byte[utflen];
- char[] chararr = new char[utflen];
-
- int c, char2, char3;
- int count = 0;
- int chararrCount = 0;
-
- readFully(bytearr, 0, utflen);
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- if (c > 127) {
- break;
- }
- count++;
- chararr[chararrCount++] = (char) c;
- }
-
- while (count < utflen) {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4) {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararrCount++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80) {
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen) {
- throw new UTFDataFormatException("malformed input: partial character at end");
- }
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
- throw new UTFDataFormatException("malformed input around byte " + (count - 1));
- }
- chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException("malformed input around byte " + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararrCount);
- }
-
- @Override
- public int readUnsignedByte() throws IOException {
- if (this.position < this.end) {
- return (this.buffer[this.position++] & 0xff);
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public int readUnsignedShort() throws IOException {
- if (this.position < this.end - 1) {
- return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
- } else {
- throw new EOFException();
- }
- }
-
- @Override
- public int skipBytes(int n) throws IOException {
- if (this.position <= this.end - n) {
- this.position += n;
- return n;
- } else {
- n = this.end - this.position;
- this.position = this.end;
- return n;
- }
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- int skippedBytes = skipBytes(numBytes);
-
- if (skippedBytes < numBytes){
- throw new EOFException("Could not skip " + numBytes + " bytes.");
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- if (b == null){
- throw new NullPointerException("Byte array b cannot be null.");
- }
-
- if (off < 0){
- throw new IndexOutOfBoundsException("Offset cannot be negative.");
- }
-
- if (len < 0){
- throw new IndexOutOfBoundsException("Length cannot be negative.");
- }
-
- if (b.length - off < len){
- throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
- ".");
- }
-
- if (this.position >= this.end) {
- return -1;
- } else {
- int toRead = Math.min(this.end - this.position, len);
- System.arraycopy(this.buffer, this.position, b, off, toRead);
- this.position += toRead;
-
- return toRead;
- }
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @SuppressWarnings("restriction")
- private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
- @SuppressWarnings("restriction")
- private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
- private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-}