You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/02 21:22:31 UTC

[2/3] flink git commit: [FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core'

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java
deleted file mode 100644
index 5811c91..0000000
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-/**
- * A simple and efficient serializer for the {@link java.io.DataOutput} interface.
- *
- * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
- */
-public class DataOutputSerializer implements DataOutputView {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);
-
-	private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
-
-	// ------------------------------------------------------------------------
-
-	private final byte[] startBuffer;
-
-	private byte[] buffer;
-
-	private int position;
-
-	private ByteBuffer wrapper;
-
-	// ------------------------------------------------------------------------
-
-	public DataOutputSerializer(int startSize) {
-		if (startSize < 1) {
-			throw new IllegalArgumentException();
-		}
-
-		this.startBuffer = new byte[startSize];
-		this.buffer = this.startBuffer;
-		this.wrapper = ByteBuffer.wrap(buffer);
-	}
-
-	public ByteBuffer wrapAsByteBuffer() {
-		this.wrapper.position(0);
-		this.wrapper.limit(this.position);
-		return this.wrapper;
-	}
-
-	public byte[] getByteArray() {
-		return buffer;
-	}
-
-	public byte[] getCopyOfBuffer() {
-		return Arrays.copyOf(buffer, position);
-	}
-
-	public void clear() {
-		this.position = 0;
-	}
-
-	public int length() {
-		return this.position;
-	}
-
-	public void pruneBuffer() {
-		if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes.");
-			}
-
-			this.buffer = this.startBuffer;
-			this.wrapper = ByteBuffer.wrap(this.buffer);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
-	}
-
-	// ----------------------------------------------------------------------------------------
-	//                               Data Output
-	// ----------------------------------------------------------------------------------------
-
-	@Override
-	public void write(int b) throws IOException {
-		if (this.position >= this.buffer.length) {
-			resize(1);
-		}
-		this.buffer[this.position++] = (byte) (b & 0xff);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		if (len < 0 || off > b.length - len) {
-			throw new ArrayIndexOutOfBoundsException();
-		}
-		if (this.position > this.buffer.length - len) {
-			resize(len);
-		}
-		System.arraycopy(b, off, this.buffer, this.position, len);
-		this.position += len;
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		write(v ? 1 : 0);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		write(v);
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		final int sLen = s.length();
-		if (this.position >= this.buffer.length - sLen) {
-			resize(sLen);
-		}
-
-		for (int i = 0; i < sLen; i++) {
-			writeByte(s.charAt(i));
-		}
-		this.position += sLen;
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		if (this.position >= this.buffer.length - 1) {
-			resize(2);
-		}
-		this.buffer[this.position++] = (byte) (v >> 8);
-		this.buffer[this.position++] = (byte) v;
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		final int sLen = s.length();
-		if (this.position >= this.buffer.length - 2 * sLen) {
-			resize(2 * sLen);
-		}
-
-		for (int i = 0; i < sLen; i++) {
-			writeChar(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		writeLong(Double.doubleToLongBits(v));
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		writeInt(Float.floatToIntBits(v));
-	}
-
-	@SuppressWarnings("restriction")
-	@Override
-	public void writeInt(int v) throws IOException {
-		if (this.position >= this.buffer.length - 3) {
-			resize(4);
-		}
-		if (LITTLE_ENDIAN) {
-			v = Integer.reverseBytes(v);
-		}
-		UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
-		this.position += 4;
-	}
-
-	@SuppressWarnings("restriction")
-	@Override
-	public void writeLong(long v) throws IOException {
-		if (this.position >= this.buffer.length - 7) {
-			resize(8);
-		}
-		if (LITTLE_ENDIAN) {
-			v = Long.reverseBytes(v);
-		}
-		UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
-		this.position += 8;
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		if (this.position >= this.buffer.length - 1) {
-			resize(2);
-		}
-		this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
-		this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
-	}
-
-	@Override
-	public void writeUTF(String str) throws IOException {
-		int strlen = str.length();
-		int utflen = 0;
-		int c;
-
-		/* use charAt instead of copying String to char array */
-		for (int i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				utflen++;
-			} else if (c > 0x07FF) {
-				utflen += 3;
-			} else {
-				utflen += 2;
-			}
-		}
-
-		if (utflen > 65535) {
-			throw new UTFDataFormatException("Encoded string is too long: " + utflen);
-		}
-		else if (this.position > this.buffer.length - utflen - 2) {
-			resize(utflen + 2);
-		}
-
-		byte[] bytearr = this.buffer;
-		int count = this.position;
-
-		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-		int i = 0;
-		for (i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F))) {
-				break;
-			}
-			bytearr[count++] = (byte) c;
-		}
-
-		for (; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				bytearr[count++] = (byte) c;
-
-			} else if (c > 0x07FF) {
-				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-			} else {
-				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-			}
-		}
-
-		this.position = count;
-	}
-
-	private void resize(int minCapacityAdd) throws IOException {
-		int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
-		byte[] nb;
-		try {
-			nb = new byte[newLen];
-		}
-		catch (NegativeArraySizeException e) {
-			throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
-		}
-		catch (OutOfMemoryError e) {
-			// this was too large to allocate, try the smaller size (if possible)
-			if (newLen > this.buffer.length + minCapacityAdd) {
-				newLen = this.buffer.length + minCapacityAdd;
-				try {
-					nb = new byte[newLen];
-				}
-				catch (OutOfMemoryError ee) {
-					// still not possible. give an informative exception message that reports the size
-					throw new IOException("Failed to serialize element. Serialized size (> "
-							+ newLen + " bytes) exceeds JVM heap space", ee);
-				}
-			} else {
-				throw new IOException("Failed to serialize element. Serialized size (> "
-						+ newLen + " bytes) exceeds JVM heap space", e);
-			}
-		}
-
-		System.arraycopy(this.buffer, 0, nb, 0, this.position);
-		this.buffer = nb;
-		this.wrapper = ByteBuffer.wrap(this.buffer);
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		if (buffer.length - this.position < numBytes){
-			throw new EOFException("Could not skip " + numBytes + " bytes.");
-		}
-
-		this.position += numBytes;
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		if (buffer.length - this.position < numBytes){
-			throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
-		}
-
-		source.readFully(this.buffer, this.position, numBytes);
-		this.position += numBytes;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
index 4c69483..4a64678 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
@@ -20,6 +20,8 @@ package org.apache.flink.queryablestate.client.state.serialization;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 8f2c8fd..04a7a21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -22,8 +22,8 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 import java.io.EOFException;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index aa8133f..742410e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 6c541a9..87b9e4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 /**
  * Record serializer which serializes the complete record to an intermediate

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 7c213b4..74260d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
index 1791fe1..d821e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -19,15 +19,15 @@
 package org.apache.flink.runtime.metrics.dump;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
deleted file mode 100644
index 4e8871a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-/**
- * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
- */
-public class DataInputDeserializer implements DataInputView, java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-
-	// ------------------------------------------------------------------------
-	
-	private byte[] buffer;
-	
-	private int end;
-
-	private int position;
-
-	// ------------------------------------------------------------------------
-	
-	public DataInputDeserializer() {}
-
-	public DataInputDeserializer(byte[] buffer) {
-		setBuffer(buffer, 0, buffer.length);
-	}
-
-	public DataInputDeserializer(byte[] buffer, int start, int len) {
-		setBuffer(buffer, start, len);
-	}
-	
-	public DataInputDeserializer(ByteBuffer buffer) {
-		setBuffer(buffer);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Changing buffers
-	// ------------------------------------------------------------------------
-	
-	public void setBuffer(ByteBuffer buffer) {
-		if (buffer.hasArray()) {
-			this.buffer = buffer.array();
-			this.position = buffer.arrayOffset() + buffer.position();
-			this.end = this.position + buffer.remaining();
-		} else if (buffer.isDirect()) {
-			this.buffer = new byte[buffer.remaining()];
-			this.position = 0;
-			this.end = this.buffer.length;
-
-			buffer.get(this.buffer);
-		} else {
-			throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
-		}
-	}
-
-	public void setBuffer(byte[] buffer, int start, int len) {
-		if (buffer == null) {
-			throw new NullPointerException();
-		}
-
-		if (start < 0 || len < 0 || start + len > buffer.length) {
-			throw new IllegalArgumentException();
-		}
-
-		this.buffer = buffer;
-		this.position = start;
-		this.end = start + len;
-	}
-	
-	public void releaseArrays() {
-		this.buffer = null;
-	}
-
-	// ----------------------------------------------------------------------------------------
-	//                               Data Input
-	// ----------------------------------------------------------------------------------------
-
-	public int available() {
-		if (position < end) {
-			return end - position;
-		} else {
-			return 0;
-		}
-	}
-	
-	@Override
-	public boolean readBoolean() throws IOException {
-		if (this.position < this.end) {
-			return this.buffer[this.position++] != 0;
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public byte readByte() throws IOException {
-		if (this.position < this.end) {
-			return this.buffer[this.position++];
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public char readChar() throws IOException {
-		if (this.position < this.end - 1) {
-			return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return Double.longBitsToDouble(readLong());
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return Float.intBitsToFloat(readInt());
-	}
-
-	@Override
-	public void readFully(byte[] b) throws IOException {
-		readFully(b, 0, b.length);
-	}
-
-	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
-		if (len >= 0) {
-			if (off <= b.length - len) {
-				if (this.position <= this.end - len) {
-					System.arraycopy(this.buffer, position, b, off, len);
-					position += len;
-				} else {
-					throw new EOFException();
-				}
-			} else {
-				throw new ArrayIndexOutOfBoundsException();
-			}
-		} else if (len < 0) {
-			throw new IllegalArgumentException("Length may not be negative.");
-		}
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		if (this.position >= 0 && this.position < this.end - 3) {
-			@SuppressWarnings("restriction")
-			int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
-			if (LITTLE_ENDIAN) {
-				value = Integer.reverseBytes(value);
-			}
-			
-			this.position += 4;
-			return value;
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public String readLine() throws IOException {
-		if (this.position < this.end) {
-			// read until a newline is found
-			StringBuilder bld = new StringBuilder();
-			char curr = (char) readUnsignedByte();
-			while (position < this.end && curr != '\n') {
-				bld.append(curr);
-				curr = (char) readUnsignedByte();
-			}
-			// trim a trailing carriage return
-			int len = bld.length();
-			if (len > 0 && bld.charAt(len - 1) == '\r') {
-				bld.setLength(len - 1);
-			}
-			String s = bld.toString();
-			bld.setLength(0);
-			return s;
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		if (position >= 0 && position < this.end - 7) {
-			@SuppressWarnings("restriction")
-			long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
-			if (LITTLE_ENDIAN) {
-				value = Long.reverseBytes(value);
-			}
-			this.position += 8;
-			return value;
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public short readShort() throws IOException {
-		if (position >= 0 && position < this.end - 1) {
-			return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public String readUTF() throws IOException {
-		int utflen = readUnsignedShort();
-		byte[] bytearr = new byte[utflen];
-		char[] chararr = new char[utflen];
-
-		int c, char2, char3;
-		int count = 0;
-		int chararr_count = 0;
-
-		readFully(bytearr, 0, utflen);
-
-		while (count < utflen) {
-			c = (int) bytearr[count] & 0xff;
-			if (c > 127) {
-				break;
-			}
-			count++;
-			chararr[chararr_count++] = (char) c;
-		}
-
-		while (count < utflen) {
-			c = (int) bytearr[count] & 0xff;
-			switch (c >> 4) {
-			case 0:
-			case 1:
-			case 2:
-			case 3:
-			case 4:
-			case 5:
-			case 6:
-			case 7:
-				/* 0xxxxxxx */
-				count++;
-				chararr[chararr_count++] = (char) c;
-				break;
-			case 12:
-			case 13:
-				/* 110x xxxx 10xx xxxx */
-				count += 2;
-				if (count > utflen) {
-					throw new UTFDataFormatException("malformed input: partial character at end");
-				}
-				char2 = (int) bytearr[count - 1];
-				if ((char2 & 0xC0) != 0x80) {
-					throw new UTFDataFormatException("malformed input around byte " + count);
-				}
-				chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
-				break;
-			case 14:
-				/* 1110 xxxx 10xx xxxx 10xx xxxx */
-				count += 3;
-				if (count > utflen) {
-					throw new UTFDataFormatException("malformed input: partial character at end");
-				}
-				char2 = (int) bytearr[count - 2];
-				char3 = (int) bytearr[count - 1];
-				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
-					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
-				}
-				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
-				break;
-			default:
-				/* 10xx xxxx, 1111 xxxx */
-				throw new UTFDataFormatException("malformed input around byte " + count);
-			}
-		}
-		// The number of chars produced may be less than utflen
-		return new String(chararr, 0, chararr_count);
-	}
-
-	@Override
-	public int readUnsignedByte() throws IOException {
-		if (this.position < this.end) {
-			return (this.buffer[this.position++] & 0xff);
-		} else {
-			throw new EOFException();
-		}
-	}
-
-	@Override
-	public int readUnsignedShort() throws IOException {
-		if (this.position < this.end - 1) {
-			return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
-		} else {
-			throw new EOFException();
-		}
-	}
-	
-	@Override
-	public int skipBytes(int n) throws IOException {
-		if (this.position <= this.end - n) {
-			this.position += n;
-			return n;
-		} else {
-			n = this.end - this.position;
-			this.position = this.end;
-			return n;
-		}
-	}
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		int skippedBytes = skipBytes(numBytes);
-
-		if (skippedBytes < numBytes){
-			throw new EOFException("Could not skip " + numBytes +" bytes.");
-		}
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		if (b == null){
-			throw new NullPointerException("Byte array b cannot be null.");
-		}
-
-		if (off < 0){
-			throw new IndexOutOfBoundsException("Offset cannot be negative.");
-		}
-
-		if (len < 0){
-			throw new IndexOutOfBoundsException("Length cannot be negative.");
-		}
-
-		if (b.length - off < len){
-			throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
-					".");
-		}
-
-		if (this.position >= this.end) {
-			return -1;
-		} else {
-			int toRead = Math.min(this.end-this.position, len);
-			System.arraycopy(this.buffer,this.position,b,off,toRead);
-			this.position += toRead;
-
-			return toRead;
-		}
-	}
-
-	@Override
-	public int read(byte[] b) throws IOException {
-		return read(b, 0, b.length);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
deleted file mode 100644
index 4f1cf77..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-/**
- * A simple and efficient serializer for the {@link java.io.DataOutput} interface.
- */
-public class DataOutputSerializer implements DataOutputView {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);
-	
-	private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
-
-	// ------------------------------------------------------------------------
-	
-	private final byte[] startBuffer;
-	
-	private byte[] buffer;
-	
-	private int position;
-
-	private ByteBuffer wrapper;
-
-	// ------------------------------------------------------------------------
-	
-	public DataOutputSerializer(int startSize) {
-		if (startSize < 1) {
-			throw new IllegalArgumentException();
-		}
-
-		this.startBuffer = new byte[startSize];
-		this.buffer = this.startBuffer;
-		this.wrapper = ByteBuffer.wrap(buffer);
-	}
-	
-	public ByteBuffer wrapAsByteBuffer() {
-		this.wrapper.position(0);
-		this.wrapper.limit(this.position);
-		return this.wrapper;
-	}
-
-	public byte[] getByteArray() {
-		return buffer;
-	}
-	
-	public byte[] getCopyOfBuffer() {
-		return Arrays.copyOf(buffer, position);
-	}
-
-	public void clear() {
-		this.position = 0;
-	}
-
-	public int length() {
-		return this.position;
-	}
-	
-	public void pruneBuffer() {
-		if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes.");
-			}
-			
-			this.buffer = this.startBuffer;
-			this.wrapper = ByteBuffer.wrap(this.buffer);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
-	}
-
-	// ----------------------------------------------------------------------------------------
-	//                               Data Output
-	// ----------------------------------------------------------------------------------------
-	
-	@Override
-	public void write(int b) throws IOException {
-		if (this.position >= this.buffer.length) {
-			resize(1);
-		}
-		this.buffer[this.position++] = (byte) (b & 0xff);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		write(b, 0, b.length);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		if (len < 0 || off > b.length - len) {
-			throw new ArrayIndexOutOfBoundsException();
-		}
-		if (this.position > this.buffer.length - len) {
-			resize(len);
-		}
-		System.arraycopy(b, off, this.buffer, this.position, len);
-		this.position += len;
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		write(v ? 1 : 0);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		write(v);
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		final int sLen = s.length();
-		if (this.position >= this.buffer.length - sLen) {
-			resize(sLen);
-		}
-		
-		for (int i = 0; i < sLen; i++) {
-			writeByte(s.charAt(i));
-		}
-		this.position += sLen;
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		if (this.position >= this.buffer.length - 1) {
-			resize(2);
-		}
-		this.buffer[this.position++] = (byte) (v >> 8);
-		this.buffer[this.position++] = (byte) v;
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		final int sLen = s.length();
-		if (this.position >= this.buffer.length - 2*sLen) {
-			resize(2*sLen);
-		} 
-		for (int i = 0; i < sLen; i++) {
-			writeChar(s.charAt(i));
-		}
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		writeLong(Double.doubleToLongBits(v));
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		writeInt(Float.floatToIntBits(v));
-	}
-
-	@SuppressWarnings("restriction")
-	@Override
-	public void writeInt(int v) throws IOException {
-		if (this.position >= this.buffer.length - 3) {
-			resize(4);
-		}
-		if (LITTLE_ENDIAN) {
-			v = Integer.reverseBytes(v);
-		}			
-		UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
-		this.position += 4;
-	}
-
-	@SuppressWarnings("restriction")
-	@Override
-	public void writeLong(long v) throws IOException {
-		if (this.position >= this.buffer.length - 7) {
-			resize(8);
-		}
-		if (LITTLE_ENDIAN) {
-			v = Long.reverseBytes(v);
-		}
-		UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
-		this.position += 8;
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		if (this.position >= this.buffer.length - 1) {
-			resize(2);
-		}
-		this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
-		this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
-	}
-
-	@Override
-	public void writeUTF(String str) throws IOException {
-		int strlen = str.length();
-		int utflen = 0;
-		int c;
-
-		/* use charAt instead of copying String to char array */
-		for (int i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				utflen++;
-			} else if (c > 0x07FF) {
-				utflen += 3;
-			} else {
-				utflen += 2;
-			}
-		}
-
-		if (utflen > 65535) {
-			throw new UTFDataFormatException("Encoded string is too long: " + utflen);
-		}
-		else if (this.position > this.buffer.length - utflen - 2) {
-			resize(utflen + 2);
-		}
-		
-		byte[] bytearr = this.buffer;
-		int count = this.position;
-
-		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-		bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
-
-		int i = 0;
-		for (i = 0; i < strlen; i++) {
-			c = str.charAt(i);
-			if (!((c >= 0x0001) && (c <= 0x007F))) {
-				break;
-			}
-			bytearr[count++] = (byte) c;
-		}
-
-		for (; i < strlen; i++) {
-			c = str.charAt(i);
-			if ((c >= 0x0001) && (c <= 0x007F)) {
-				bytearr[count++] = (byte) c;
-
-			} else if (c > 0x07FF) {
-				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-			} else {
-				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
-			}
-		}
-
-		this.position = count;
-	}
-	
-	
-	private void resize(int minCapacityAdd) throws IOException {
-		int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
-		byte[] nb;
-		try {
-			nb = new byte[newLen];
-		}
-		catch (NegativeArraySizeException e) {
-			throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
-		}
-		catch (OutOfMemoryError e) {
-			// this was too large to allocate, try the smaller size (if possible)
-			if (newLen > this.buffer.length + minCapacityAdd) {
-				newLen = this.buffer.length + minCapacityAdd;
-				try {
-					nb = new byte[newLen];
-				}
-				catch (OutOfMemoryError ee) {
-					// still not possible. give an informative exception message that reports the size
-					throw new IOException("Failed to serialize element. Serialized size (> "
-							+ newLen + " bytes) exceeds JVM heap space", ee);
-				}
-			} else {
-				throw new IOException("Failed to serialize element. Serialized size (> "
-						+ newLen + " bytes) exceeds JVM heap space", e);
-			}
-		}
-
-		System.arraycopy(this.buffer, 0, nb, 0, this.position);
-		this.buffer = nb;
-		this.wrapper = ByteBuffer.wrap(this.buffer);
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		if(buffer.length - this.position < numBytes){
-			throw new EOFException("Could not skip " + numBytes + " bytes.");
-		}
-
-		this.position += numBytes;
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		if(buffer.length - this.position < numBytes){
-			throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
-		}
-
-		source.readFully(this.buffer, this.position, numBytes);
-		this.position += numBytes;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index c8bffb5..7e652ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.io.network.api;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index 8455402..aad7ee1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 9d0ee67..f988c55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index b7bcb3e..fe9a386 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -22,9 +22,9 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
-import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
deleted file mode 100644
index 74de096..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.Random;
-
-public class AsciiStringType implements SerializationTestType {
-
-	private static final int MAX_LEN = 1500;
-
-	public String value;
-
-	public AsciiStringType() {
-		this.value = "";
-	}
-
-	private AsciiStringType(String value) {
-		this.value = value;
-	}
-
-	@Override
-	public AsciiStringType getRandom(Random rnd) {
-		final StringBuilder bld = new StringBuilder();
-		final int len = rnd.nextInt(MAX_LEN + 1);
-
-		for (int i = 0; i < len; i++) {
-			// 1--127
-			bld.append((char) (rnd.nextInt(126) + 1));
-		}
-
-		return new AsciiStringType(bld.toString());
-	}
-
-	@Override
-	public int length() {
-		return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeUTF(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUTF();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof AsciiStringType) {
-			AsciiStringType other = (AsciiStringType) obj;
-			return this.value.equals(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java
deleted file mode 100644
index 66b099d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class BooleanType implements SerializationTestType {
-
-	private boolean value;
-
-	public BooleanType() {
-		this.value = false;
-	}
-
-	private BooleanType(boolean value) {
-		this.value = value;
-	}
-
-	@Override
-	public BooleanType getRandom(Random rnd) {
-		return new BooleanType(rnd.nextBoolean());
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeBoolean(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readBoolean();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value ? 1 : 0;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof BooleanType) {
-			BooleanType other = (BooleanType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
deleted file mode 100644
index 66fa22c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteArrayType implements SerializationTestType {
-
-	private static final int MAX_LEN = 512 * 15;
-
-	private byte[] data;
-
-	public ByteArrayType() {
-		this.data = new byte[0];
-	}
-
-	public ByteArrayType(byte[] data) {
-		this.data = data;
-	}
-
-	@Override
-	public ByteArrayType getRandom(Random rnd) {
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final byte[] data = new byte[len];
-		rnd.nextBytes(data);
-		return new ByteArrayType(data);
-	}
-
-	@Override
-	public int length() {
-		return data.length + 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.data.length);
-		out.write(this.data);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		final int len = in.readInt();
-		this.data = new byte[len];
-		in.readFully(this.data);
-	}
-
-	@Override
-	public int hashCode() {
-		return Arrays.hashCode(this.data);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteArrayType) {
-			ByteArrayType other = (ByteArrayType) obj;
-			return Arrays.equals(this.data, other.data);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
deleted file mode 100644
index 6431f14..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteSubArrayType implements SerializationTestType {
-
-	private static final int MAX_LEN = 512;
-
-	private final byte[] data;
-
-	private int len;
-
-	public ByteSubArrayType() {
-		this.data = new byte[MAX_LEN];
-		this.len = 0;
-	}
-
-	@Override
-	public ByteSubArrayType getRandom(Random rnd) {
-		final int len = rnd.nextInt(MAX_LEN) + 1;
-		final ByteSubArrayType t = new ByteSubArrayType();
-		t.len = len;
-
-		final byte[] data = t.data;
-		for (int i = 0; i < len; i++) {
-			data[i] = (byte) rnd.nextInt(256);
-		}
-
-		return t;
-	}
-
-	@Override
-	public int length() {
-		return len + 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.len);
-		out.write(this.data, 0, this.len);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.len = in.readInt();
-		in.readFully(this.data, 0, this.len);
-	}
-
-	@Override
-	public int hashCode() {
-		final byte[] copy = new byte[this.len];
-		System.arraycopy(this.data, 0, copy, 0, this.len);
-		return Arrays.hashCode(copy);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteSubArrayType) {
-			ByteSubArrayType other = (ByteSubArrayType) obj;
-			if (this.len == other.len) {
-				for (int i = 0; i < this.len; i++) {
-					if (this.data[i] != other.data[i]) {
-						return false;
-					}
-				}
-				return true;
-			} else {
-				return false;
-			}
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
deleted file mode 100644
index 87fd7c0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ByteType implements SerializationTestType {
-
-	private byte value;
-
-	public ByteType() {
-		this.value = (byte) 0;
-	}
-
-	private ByteType(byte value) {
-		this.value = value;
-	}
-
-	@Override
-	public ByteType getRandom(Random rnd) {
-		return new ByteType((byte) rnd.nextInt(256));
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeByte(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readByte();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ByteType) {
-			ByteType other = (ByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
deleted file mode 100644
index b162ea0..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class CharType implements SerializationTestType {
-
-	private char value;
-
-	public CharType() {
-		this.value = 0;
-	}
-
-	private CharType(char value) {
-		this.value = value;
-	}
-
-	@Override
-	public CharType getRandom(Random rnd) {
-		return new CharType((char) rnd.nextInt(10000));
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeChar(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readChar();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof CharType) {
-			CharType other = (CharType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
deleted file mode 100644
index 654b685..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class DoubleType implements SerializationTestType {
-
-	private double value;
-
-	public DoubleType() {
-		this.value = 0;
-	}
-
-	private DoubleType(double value) {
-		this.value = value;
-	}
-
-	@Override
-	public DoubleType getRandom(Random rnd) {
-		return new DoubleType(rnd.nextDouble());
-	}
-
-	@Override
-	public int length() {
-		return 8;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeDouble(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readDouble();
-	}
-
-	@Override
-	public int hashCode() {
-		final long l = Double.doubleToLongBits(this.value);
-		return (int) (l ^ l >>> 32);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof DoubleType) {
-			DoubleType other = (DoubleType) obj;
-			return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
deleted file mode 100644
index 653be45..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class FloatType implements SerializationTestType {
-
-	private float value;
-
-	public FloatType() {
-		this.value = 0;
-	}
-
-	private FloatType(float value) {
-		this.value = value;
-	}
-
-	@Override
-	public FloatType getRandom(Random rnd) {
-		return new FloatType(rnd.nextFloat());
-	}
-
-	@Override
-	public int length() {
-		return 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeFloat(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readFloat();
-	}
-
-	@Override
-	public int hashCode() {
-		return Float.floatToIntBits(this.value);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof FloatType) {
-			FloatType other = (FloatType) obj;
-			return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value);
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
deleted file mode 100644
index 4c6429d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class IntType implements SerializationTestType {
-
-	private int value;
-
-	public IntType() {
-		this.value = 0;
-	}
-
-	public IntType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public IntType getRandom(Random rnd) {
-		return new IntType(rnd.nextInt());
-	}
-
-	@Override
-	public int length() {
-		return 4;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readInt();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof IntType) {
-			IntType other = (IntType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
deleted file mode 100644
index 934dfb7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class LongType implements SerializationTestType {
-
-	private long value;
-
-	public LongType() {
-		this.value = 0;
-	}
-
-	private LongType(long value) {
-		this.value = value;
-	}
-
-	@Override
-	public LongType getRandom(Random rnd) {
-		return new LongType(rnd.nextLong());
-	}
-
-	@Override
-	public int length() {
-		return 8;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readLong();
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (this.value ^ this.value >>> 32);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof LongType) {
-			LongType other = (LongType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
deleted file mode 100644
index 69db122..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.util.Random;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public interface SerializationTestType extends IOReadableWritable {
-
-	public SerializationTestType getRandom(Random rnd);
-
-	public int length();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
deleted file mode 100644
index 03a093f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-public enum SerializationTestTypeFactory {
-	
-	BOOLEAN(new BooleanType()),
-	BYTE_ARRAY(new ByteArrayType()),
-	BYTE_SUB_ARRAY(new ByteSubArrayType()),
-	BYTE(new ByteType()),
-	CHAR(new CharType()),
-	DOUBLE(new DoubleType()),
-	FLOAT(new FloatType()),
-	INT(new IntType()),
-	LONG(new LongType()),
-	SHORT(new ShortType()),
-	UNSIGNED_BYTE(new UnsignedByteType()),
-	UNSIGNED_SHORT(new UnsignedShortType()),
-	STRING(new AsciiStringType());
-
-	private final SerializationTestType factory;
-
-	SerializationTestTypeFactory(SerializationTestType type) {
-		this.factory = type;
-	}
-
-	public SerializationTestType factory() {
-		return this.factory;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
deleted file mode 100644
index 69e0ffc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class ShortType implements SerializationTestType {
-
-	private short value;
-
-	public ShortType() {
-		this.value = (short) 0;
-	}
-
-	private ShortType(short value) {
-		this.value = value;
-	}
-
-	@Override
-	public ShortType getRandom(Random rnd) {
-		return new ShortType((short) rnd.nextInt(65536));
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeShort(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readShort();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof ShortType) {
-			ShortType other = (ShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
deleted file mode 100644
index eabb1dd..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class UnsignedByteType implements SerializationTestType {
-
-	private int value;
-
-	public UnsignedByteType() {
-		this.value = 0;
-	}
-
-	private UnsignedByteType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public UnsignedByteType getRandom(Random rnd) {
-		return new UnsignedByteType(rnd.nextInt(128) + 128);
-	}
-
-	@Override
-	public int length() {
-		return 1;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeByte(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUnsignedByte();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof UnsignedByteType) {
-			UnsignedByteType other = (UnsignedByteType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
deleted file mode 100644
index 6242900..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public class UnsignedShortType implements SerializationTestType {
-
-	private int value;
-
-	public UnsignedShortType() {
-		this.value = 0;
-	}
-
-	private UnsignedShortType(int value) {
-		this.value = value;
-	}
-
-	@Override
-	public UnsignedShortType getRandom(Random rnd) {
-		return new UnsignedShortType(rnd.nextInt(32768) + 32768);
-	}
-
-	@Override
-	public int length() {
-		return 2;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeShort(this.value);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.value = in.readUnsignedShort();
-	}
-
-	@Override
-	public int hashCode() {
-		return this.value;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof UnsignedShortType) {
-			UnsignedShortType other = (UnsignedShortType) obj;
-			return this.value == other.value;
-		} else {
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
deleted file mode 100644
index 110f7ce..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization.types;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-
-/**
- * Utility class to help serialization for testing.
- */
-public final class Util {
-
-	private static final long SEED = 64871654635745873L;
-
-	private static Random random = new Random(SEED);
-
-	public static SerializationTestType randomRecord(SerializationTestTypeFactory type) {
-		return type.factory().getRandom(Util.random);
-	}
-
-	public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) {
-
-		return new MockRecords(numElements) {
-			@Override
-			protected SerializationTestType getRecord() {
-				return type.factory().getRandom(Util.random);
-			}
-		};
-	}
-
-	public static MockRecords randomRecords(final int numElements) {
-
-		return new MockRecords(numElements) {
-			@Override
-			protected SerializationTestType getRecord() {
-				// select random test type factory
-				SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values();
-				int i = Util.random.nextInt(types.length);
-
-				return types[i].factory().getRandom(Util.random);
-			}
-		};
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	public abstract static class MockRecords implements Iterable<SerializationTestType> {
-
-		private int numRecords;
-
-		public MockRecords(int numRecords) {
-			this.numRecords = numRecords;
-		}
-
-		@Override
-		public Iterator<SerializationTestType> iterator() {
-			return new Iterator<SerializationTestType>() {
-				@Override
-				public boolean hasNext() {
-					return numRecords > 0;
-				}
-
-				@Override
-				public SerializationTestType next() {
-					if (numRecords > 0) {
-						numRecords--;
-
-						return getRecord();
-					}
-
-					throw new NoSuchElementException();
-				}
-
-				@Override
-				public void remove() {
-					throw new UnsupportedOperationException();
-				}
-			};
-		}
-
-		abstract protected SerializationTestType getRecord();
-	}
-
-	/**
-	 * No instantiation.
-	 */
-	private Util() {
-		throw new RuntimeException();
-	}
-}