You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/02 19:25:42 UTC

[1/2] git commit: [FLINK-610] Added KryoSerializer

Repository: incubator-flink
Updated Branches:
  refs/heads/master d60a3169f -> dac281f40


[FLINK-610] Added KryoSerializer

This closes #74


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22203e75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22203e75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22203e75

Branch: refs/heads/master
Commit: 22203e75f8a0d193ce0187396d0b267e05e9b58e
Parents: d60a316
Author: Till Rohrmann <ti...@gmail.com>
Authored: Thu Jul 17 17:21:59 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 16:35:00 2014 +0200

----------------------------------------------------------------------
 .../api/java/typeutils/GenericTypeInfo.java     |   4 +-
 .../typeutils/runtime/DataInputViewStream.java  |  68 ++++
 .../typeutils/runtime/DataOutputViewStream.java |  41 ++
 .../java/typeutils/runtime/KryoSerializer.java  | 120 ++++++
 .../java/typeutils/runtime/NoFetchingInput.java | 140 +++++++
 .../AbstractGenericArraySerializerTest.java     | 186 +++++++++
 .../AbstractGenericTypeComparatorTest.java      | 376 ++++++++++++++++++
 .../AbstractGenericTypeSerializerTest.java      | 300 +++++++++++++++
 .../runtime/AvroGenericArraySerializerTest.java |  28 ++
 .../runtime/AvroGenericTypeComparatorTest.java  |  28 ++
 .../runtime/AvroGenericTypeSerializerTest.java  |  29 ++
 .../runtime/GenericArraySerializerTest.java     | 186 ---------
 .../runtime/GenericTypeComparatorTest.java      | 379 -------------------
 .../runtime/GenericTypeSerializerTest.java      | 300 ---------------
 .../runtime/KryoGenericArraySerializerTest.java |  28 ++
 .../runtime/KryoGenericTypeComparatorTest.java  |  28 ++
 .../runtime/KryoGenericTypeSerializerTest.java  |  28 ++
 .../typeutils/runtime/TupleSerializerTest.java  |  11 +-
 18 files changed, 1407 insertions(+), 873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index cbc36ed..f7f78c1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 import org.apache.flink.types.TypeInformation;
 
 
@@ -64,7 +64,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public TypeSerializer<T> createSerializer() {
-		return new AvroSerializer<T>(this.typeClass);
+		return new KryoSerializer<T>(this.typeClass);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
new file mode 100644
index 0000000..81b5bae
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class DataInputViewStream extends InputStream{
+	protected DataInputView inputView;
+
+	public DataInputViewStream(DataInputView inputView){
+		this.inputView = inputView;
+	}
+
+	public DataInputView getInputView(){
+		return inputView;
+	}
+
+	@Override
+	public int read() throws IOException {
+		try{
+			return inputView.readUnsignedByte();
+		}catch(EOFException ex){
+			return -1;
+		}
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		long counter = n;
+		while(counter > Integer.MAX_VALUE){
+			int skippedBytes = inputView.skipBytes(Integer.MAX_VALUE);
+
+			if(skippedBytes == 0){
+				return n - counter;
+			}
+
+			counter -= skippedBytes;
+		}
+
+		return n - counter - inputView.skipBytes((int) counter);
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		return inputView.read(b, off, len);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
new file mode 100644
index 0000000..6a4909e
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputViewStream.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class DataOutputViewStream extends OutputStream {
+	protected DataOutputView outputView;
+
+	public DataOutputViewStream(DataOutputView outputView){
+		this.outputView = outputView;
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		outputView.writeByte(b);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		outputView.write(b, off, len);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
new file mode 100644
index 0000000..ad7f83a
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+public class KryoSerializer<T> extends TypeSerializer<T> {
+	private static final long serialVersionUID = 1L;
+
+	private final Class<T> type;
+	private final Class<? extends T> typeToInstantiate;
+
+	private transient Kryo kryo;
+	private transient T copyInstance = null;
+	private transient Input in = null;
+
+	public KryoSerializer(Class<T> type){
+		this(type,type);
+	}
+
+	public KryoSerializer(Class<T> type, Class<? extends T> typeToInstantiate){
+		if(type == null || typeToInstantiate == null){
+			throw new NullPointerException("Type class cannot be null.");
+		}
+
+		this.type = type;
+		this.typeToInstantiate = typeToInstantiate;
+		kryo = new Kryo();
+		kryo.setAsmEnabled(true);
+		kryo.register(type);
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return true;
+	}
+
+	@Override
+	public T createInstance() {
+		checkKryoInitialized();
+		return kryo.newInstance(typeToInstantiate);
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		checkKryoInitialized();
+		reuse = kryo.copy(from);
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		checkKryoInitialized();
+		DataOutputViewStream outputStream = new DataOutputViewStream(target);
+		Output out = new Output(outputStream);
+		kryo.writeObject(out, record);
+		out.flush();
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		checkKryoInitialized();
+		DataInputViewStream inputStream = new DataInputViewStream(source);
+		Input in = new NoFetchingInput(inputStream);
+		reuse = kryo.readObject(in, typeToInstantiate);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		checkKryoInitialized();
+		if(this.copyInstance == null){
+			this.copyInstance = createInstance();
+		}
+
+		T tmp = deserialize(copyInstance, source);
+		serialize(tmp, target);
+	}
+
+	private final void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+			this.kryo.setAsmEnabled(true);
+			this.kryo.register(type);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
new file mode 100644
index 0000000..5462000
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class NoFetchingInput extends Input {
+	public NoFetchingInput(InputStream inputStream){
+		super(inputStream, 8);
+	}
+
+	@Override
+	public boolean eof(){
+		throw new UnsupportedOperationException("NoFetchingInput does not support EOF.");
+	}
+
+	@Override
+	public int read() throws KryoException {
+		require(1);
+		return buffer[position++] & 0xFF;
+	}
+
+	@Override
+	public boolean canReadInt() throws KryoException {
+		throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+	}
+
+	@Override
+	public boolean canReadLong() throws KryoException {
+		throw new UnsupportedOperationException("NoFetchingInput cannot prefetch data.");
+	}
+
+	/**
+	 * Require makes sure that at least required number of bytes are kept in the buffer. If not, then
+	 * it will load exactly the difference between required and currently available number of bytes.
+	 * Thus, it will only load the data which is required and never prefetch data.
+	 *
+	 * @param required the number of bytes being available in the buffer
+	 * @return the number of bytes remaining, which is equal to required
+	 * @throws KryoException
+	 */
+	@Override
+	protected int require(int required) throws KryoException {
+		if(required > capacity) {
+			throw new KryoException("Buffer too small: capacity: " + capacity + ", " +
+					"required: " + required);
+		}
+
+		position = 0;
+		int bytesRead = 0;
+		int count;
+		while(true){
+			count = fill(buffer, bytesRead, required - bytesRead);
+
+			if(count == -1){
+				throw new KryoException("Buffer underflow");
+			}
+
+			bytesRead += count;
+			if(bytesRead == required){
+				break;
+			}
+		}
+		limit = required;
+		return required;
+	}
+
+	@Override
+	public int read(byte[] bytes, int offset, int count) throws KryoException {
+		if(bytes == null){
+			throw new IllegalArgumentException("bytes cannot be null.");
+		}
+
+		try {
+			return inputStream.read(bytes, offset, count);
+		}catch(IOException ex){
+			throw new KryoException(ex);
+		}
+	}
+
+	@Override
+	public void skip(int count) throws KryoException {
+		try{
+			inputStream.skip(count);
+		}catch(IOException ex){
+			throw new KryoException(ex);
+		}
+	}
+
+	@Override
+	public void readBytes(byte[] bytes, int offset, int count) throws KryoException {
+		if(bytes == null){
+			throw new IllegalArgumentException("bytes cannot be null.");
+		}
+
+		try{
+			int bytesRead = 0;
+			int c;
+
+			while(true){
+				c = inputStream.read(bytes, offset+bytesRead, count-bytesRead);
+
+				if(c == -1){
+					throw new KryoException("Buffer underflow");
+				}
+
+				bytesRead += c;
+
+				if(bytesRead == count){
+					break;
+				}
+			}
+		}catch(IOException ex){
+			throw new KryoException(ex);
+		}
+	}
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
new file mode 100644
index 0000000..0700923
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericArraySerializerTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
+import org.apache.flink.util.StringUtils;
+
+abstract public class AbstractGenericArraySerializerTest {
+	
+	private final Random rnd = new Random(349712539451944123L);
+	
+	
+	@Test
+	public void testString() {
+		String[] arr1 = new String[] {"abc", "",
+				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+				StringUtils.getRandomString(new Random(289347567856686223L), 15, 50),
+				StringUtils.getRandomString(new Random(289347567856686223L), 30, 170),
+				StringUtils.getRandomString(new Random(289347567856686223L), 14, 15),
+				""};
+		
+		String[] arr2 = new String[] {"foo", "",
+				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+				StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
+				StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
+				StringUtils.getRandomString(new Random(289347567856686223L), 100*1024, 105*1024),
+				"bar"};
+		
+		// run tests with the string serializer as the component serializer
+		runTests(String.class, new StringSerializer(), arr1, arr2);
+		
+		// run the tests with the generic serializer as the component serializer
+		runTests(arr1, arr2);
+	}
+	
+	@Test
+	public void testSimpleTypesObjects() {
+		SimpleTypes a = new SimpleTypes();
+		SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		
+		runTests(new SimpleTypes[] {a, b, c}, new SimpleTypes[] {d, e, f, g});
+	}
+	
+	@Test
+	public void testCompositeObject() {
+		ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
+		ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
+		ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
+		ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
+		ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
+		
+		runTests(new ComplexNestedObject1[] {o1, o2}, new ComplexNestedObject1[] {o3}, new ComplexNestedObject1[] {o4, o5});
+	}
+	
+	@Test
+	public void testNestedObjects() {
+		ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
+		ComplexNestedObject2 o2 = new ComplexNestedObject2();
+		ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
+		ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
+		
+		runTests(	new ComplexNestedObject2[] {o1, o2, o3},
+					new ComplexNestedObject2[] {},
+					new ComplexNestedObject2[] {},
+					new ComplexNestedObject2[] {o4},
+					new ComplexNestedObject2[] {});
+	}
+	
+	@Test
+	public void testBeanStyleObjects() {
+		{
+			Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
+			Book b2 = new Book(0L, "Debugging byte streams", 1337);
+			Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+			Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF);
+			Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00);
+			Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L);
+			
+			runTests(	new Book[] {b1, b2},
+						new Book[] {},
+						new Book[] {},
+						new Book[] {},
+						new Book[] {},
+						new Book[] {b3, b4, b5, b6});
+		}
+		
+		// object with collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			list.add("A");
+			list.add("B");
+			list.add("C");
+			list.add("D");
+			list.add("E");
+			
+			BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
+			
+			ArrayList<String> list2 = new ArrayList<String>();
+			BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
+			
+			runTests(new BookAuthor[] {b1, b2});
+		}
+	}
+
+	private final <T> void runTests(T[]... instances) {
+		try {
+			@SuppressWarnings("unchecked")
+			Class<T> type = (Class<T>) instances[0][0].getClass();
+			TypeSerializer<T> serializer = createComponentSerializer(type);
+			runTests(type, serializer, instances);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	private final <T> void runTests(Class<T> type, TypeSerializer<T> componentSerializer, T[]... instances) {
+		try {
+			if (type == null || componentSerializer == null || instances == null || instances.length == 0) {
+				throw new IllegalArgumentException();
+			}
+			
+			@SuppressWarnings("unchecked")
+			Class<T[]> arrayClass = (Class<T[]>) Array.newInstance(type, 0).getClass();
+			
+			GenericArraySerializer<T> serializer = createSerializer(type, componentSerializer);
+			SerializerTestInstance<T[]> test = new SerializerTestInstance<T[]>(serializer, arrayClass, -1, instances);
+			test.testAll();
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	private final <T> GenericArraySerializer<T> createSerializer(Class<T> type, TypeSerializer<T> componentSerializer) {
+		return new GenericArraySerializer<T>(type, componentSerializer);
+	}
+
+	abstract protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
new file mode 100644
index 0000000..dee7d9a
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeComparatorTest.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+abstract public class AbstractGenericTypeComparatorTest {
+
+	@Test
+	public void testString() {
+		runTests(new String[]{
+				"",
+				"Lorem Ipsum Dolor Omit Longer",
+				"aaaa",
+				"abcd",
+				"abce",
+				"abdd",
+				"accd",
+				"bbcd"
+		});
+	}
+
+	@Test
+	public void testSimpleTypesObjects() {
+		runTests(
+				new SimpleTypes(0, 1, (byte) 2, "", (short) 3, 4.0),
+				new SimpleTypes(1, 1, (byte) 2, "", (short) 3, 4.0),
+				new SimpleTypes(1, 2, (byte) 2, "", (short) 3, 4.0),
+				new SimpleTypes(1, 2, (byte) 3, "", (short) 3, 4.0),
+				new SimpleTypes(1, 2, (byte) 3, "a", (short) 3, 4.0),
+				new SimpleTypes(1, 2, (byte) 3, "b", (short) 3, 4.0),
+				new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 4.0),
+				new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 6.0)
+		);
+	}
+
+	@Test
+	public void testCompositeObject() {
+		ComplexNestedObject1 o1 = new ComplexNestedObject1(-1100);
+		ComplexNestedObject1 o2 = new ComplexNestedObject1(0);
+		ComplexNestedObject1 o3 = new ComplexNestedObject1(44);
+		ComplexNestedObject1 o4 = new ComplexNestedObject1(76923, "A");
+		ComplexNestedObject1 o5 = new ComplexNestedObject1(5626435, "A somewhat random collection");
+
+		runTests(o1, o2, o3, o4, o5);
+	}
+
+	@Test
+	public void testBeanStyleObjects() {
+		{
+			Book b111 = new Book(-1L, "A Low level interfaces", 0xC);
+			Book b122 = new Book(-1L, "Low level interfaces", 0xC);
+			Book b123 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+
+			Book b2 = new Book(0L, "Debugging byte streams", 1337);
+			Book b3 = new Book(976243875L, "The Serialization Odysse", 42);
+
+			runTests(b111, b122, b123, b2, b3);
+		}
+
+		{
+			BookAuthor b1 = new BookAuthor(976243875L, new ArrayList<String>(), "Arno Nym");
+
+			ArrayList<String> list = new ArrayList<String>();
+			list.add("A");
+			list.add("B");
+			list.add("C");
+			list.add("D");
+			list.add("E");
+
+			BookAuthor b2 = new BookAuthor(976243875L, list, "The Saurus");
+
+			runTests(b1, b2);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private final <T> void runTests(T... sortedTestData) {
+		ComparatorTestInstance<T> testBase = new ComparatorTestInstance<T>(sortedTestData);
+		testBase.testAll();
+	}
+
+	abstract protected <T> TypeSerializer<T> createSerializer(Class<T> type);
+
+	// ------------------------------------------------------------------------
+	// test instance
+	// ------------------------------------------------------------------------
+
+	private class ComparatorTestInstance<T> extends ComparatorTestBase<T> {
+
+		private final T[] testData;
+
+		private final Class<T> type;
+
+		@SuppressWarnings("unchecked")
+		public ComparatorTestInstance(T[] testData) {
+			if (testData == null || testData.length == 0) {
+				throw new IllegalArgumentException();
+			}
+
+			this.testData = testData;
+			this.type = (Class<T>) testData[0].getClass();
+		}
+
+		@Override
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		protected TypeComparator<T> createComparator(boolean ascending) {
+			return new GenericTypeComparator(ascending, AbstractGenericTypeComparatorTest.this.createSerializer(this
+					.type), this.type);
+		}
+
+		@Override
+		protected TypeSerializer<T> createSerializer() {
+			return AbstractGenericTypeComparatorTest.this.createSerializer(this.type);
+		}
+
+		@Override
+		protected T[] getSortedTestData() {
+			return this.testData;
+		}
+
+		public void testAll() {
+			testDuplicate();
+			testEquality();
+			testEqualityWithReference();
+			testInequality();
+			testInequalityWithReference();
+			testNormalizedKeysEqualsFullLength();
+			testNormalizedKeysEqualsHalfLength();
+			testNormalizedKeysGreatSmallFullLength();
+			testNormalizedKeysGreatSmallAscDescHalfLength();
+			testNormalizedKeyReadWriter();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test objects
+	// ------------------------------------------------------------------------
+
+	public static final class SimpleTypes implements Comparable<SimpleTypes> {
+
+		private final int iVal;
+		private final long lVal;
+		private final byte bVal;
+		private final String sVal;
+		private final short rVal;
+		private final double dVal;
+
+		public SimpleTypes() {
+			this(0, 0, (byte) 0, "", (short) 0, 0);
+		}
+
+		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+			this.iVal = iVal;
+			this.lVal = lVal;
+			this.bVal = bVal;
+			this.sVal = sVal;
+			this.rVal = rVal;
+			this.dVal = dVal;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == SimpleTypes.class) {
+				SimpleTypes other = (SimpleTypes) obj;
+
+				return other.iVal == this.iVal &&
+						other.lVal == this.lVal &&
+						other.bVal == this.bVal &&
+						other.sVal.equals(this.sVal) &&
+						other.rVal == this.rVal &&
+						other.dVal == this.dVal;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int compareTo(SimpleTypes o) {
+			int cmp = (this.iVal < o.iVal ? -1 : (this.iVal == o.iVal ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			cmp = (this.lVal < o.lVal ? -1 : (this.lVal == o.lVal ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			cmp = (this.bVal < o.bVal ? -1 : (this.bVal == o.bVal ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			cmp = this.sVal.compareTo(o.sVal);
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			cmp = (this.rVal < o.rVal ? -1 : (this.rVal == o.rVal ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			return (this.dVal < o.dVal ? -1 : (this.dVal == o.dVal ? 0 : 1));
+		}
+	}
+
+	public static class ComplexNestedObject1 implements Comparable<ComplexNestedObject1> {
+
+		private double doubleValue;
+
+		private List<String> stringList;
+
+		public ComplexNestedObject1() {
+		}
+
+		public ComplexNestedObject1(double value, String... listElements) {
+			this.doubleValue = value;
+
+			this.stringList = new ArrayList<String>();
+			for (String str : listElements) {
+				this.stringList.add(str);
+			}
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject1.class) {
+				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int compareTo(ComplexNestedObject1 o) {
+			int cmp = (this.doubleValue < o.doubleValue ? -1 : (this.doubleValue == o.doubleValue ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			int size = this.stringList.size();
+			int otherSize = o.stringList.size();
+
+			cmp = (size < otherSize ? -1 : (size == otherSize ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			for (int i = 0; i < size; i++) {
+				cmp = this.stringList.get(i).compareTo(o.stringList.get(i));
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+
+			return 0;
+		}
+	}
+
+	public static class Book implements Comparable<Book> {
+
+		private long bookId;
+		private String title;
+		private long authorId;
+
+		public Book() {
+		}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == Book.class) {
+				Book other = (Book) obj;
+				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int compareTo(Book o) {
+			int cmp = (this.bookId < o.bookId ? -1 : (this.bookId == o.bookId ? 0 : 1));
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			cmp = title.compareTo(o.title);
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			return (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
+		}
+	}
+
+	public static class BookAuthor implements Comparable<BookAuthor> {
+
+		private long authorId;
+		private List<String> bookTitles;
+		private String authorName;
+
+		public BookAuthor() {
+		}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == BookAuthor.class) {
+				BookAuthor other = (BookAuthor) obj;
+				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+						other.bookTitles.equals(this.bookTitles);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public int compareTo(BookAuthor o) {
+			int cmp = (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
+			if (cmp != 0) return cmp;
+
+			int size = this.bookTitles.size();
+			int oSize = o.bookTitles.size();
+			cmp = (size < oSize ? -1 : (size == oSize ? 0 : 1));
+			if (cmp != 0) return cmp;
+
+			for (int i = 0; i < size; i++) {
+				cmp = this.bookTitles.get(i).compareTo(o.bookTitles.get(i));
+				if (cmp != 0) return cmp;
+			}
+
+			return this.authorName.compareTo(o.authorName);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
new file mode 100644
index 0000000..8f97c43
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A test for the {@link AvroSerializer}.
+ */
+abstract public class AbstractGenericTypeSerializerTest {
+
+	private final Random rnd = new Random(349712539451944123L);
+
+
+	@Test
+	public void testString() {
+		runTests("abc", "",
+				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
+				StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
+				StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
+				StringUtils.getRandomString(new Random(289347567856686223L), 100 * 1024, 105 * 1024));
+	}
+
+	@Test
+	public void testSimpleTypesObjects() {
+		SimpleTypes a = new SimpleTypes();
+		SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+		SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
+				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
+
+		runTests(a, b, c, d, e, f, g);
+	}
+
+	@Test
+	public void testCompositeObject() {
+		ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
+		ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
+		ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
+		ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
+		ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
+
+		runTests(o1, o2, o3, o4, o5);
+	}
+
+	@Test
+	public void testNestedObjects() {
+		ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
+		ComplexNestedObject2 o2 = new ComplexNestedObject2();
+		ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
+		ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
+
+		runTests(o1, o2, o3, o4);
+	}
+
+	@Test
+	public void testBeanStyleObjects() {
+		{
+			Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
+			Book b2 = new Book(0L, "Debugging byte streams", 1337);
+			Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
+
+			runTests(b1, b2, b3);
+		}
+
+		// object with collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			list.add("A");
+			list.add("B");
+			list.add("C");
+			list.add("D");
+			list.add("E");
+
+			BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
+
+			ArrayList<String> list2 = new ArrayList<String>();
+			BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
+
+			runTests(b1, b2);
+		}
+	}
+
+	private final <T> void runTests(T... instances) {
+		if (instances == null || instances.length == 0) {
+			throw new IllegalArgumentException();
+		}
+
+		@SuppressWarnings("unchecked")
+		Class<T> clazz = (Class<T>) instances[0].getClass();
+
+		TypeSerializer<T> serializer = createSerializer(clazz);
+		SerializerTestInstance<T> test = new SerializerTestInstance<T>(serializer, clazz, -1, instances);
+		test.testAll();
+	}
+
+	abstract protected <T> TypeSerializer<T> createSerializer(Class<T> type);
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Test Objects
+	// --------------------------------------------------------------------------------------------
+
+
+	public static final class SimpleTypes {
+
+		private final int iVal;
+		private final long lVal;
+		private final byte bVal;
+		private final String sVal;
+		private final short rVal;
+		private final double dVal;
+
+
+		public SimpleTypes() {
+			this(0, 0, (byte) 0, "", (short) 0, 0);
+		}
+
+		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+			this.iVal = iVal;
+			this.lVal = lVal;
+			this.bVal = bVal;
+			this.sVal = sVal;
+			this.rVal = rVal;
+			this.dVal = dVal;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == SimpleTypes.class) {
+				SimpleTypes other = (SimpleTypes) obj;
+
+				return other.iVal == this.iVal &&
+						other.lVal == this.lVal &&
+						other.bVal == this.bVal &&
+						other.sVal.equals(this.sVal) &&
+						other.rVal == this.rVal &&
+						other.dVal == this.dVal;
+
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public String toString() {
+			return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
+		}
+	}
+
+	public static class ComplexNestedObject1 {
+
+		private double doubleValue;
+
+		private List<String> stringList;
+
+		public ComplexNestedObject1() {
+		}
+
+		public ComplexNestedObject1(int offInit) {
+			this.doubleValue = 6293485.6723 + offInit;
+
+			this.stringList = new ArrayList<String>();
+			this.stringList.add("A" + offInit);
+			this.stringList.add("somewhat" + offInit);
+			this.stringList.add("random" + offInit);
+			this.stringList.add("collection" + offInit);
+			this.stringList.add("of" + offInit);
+			this.stringList.add("strings" + offInit);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject1.class) {
+				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class ComplexNestedObject2 {
+
+		private long longValue;
+
+		private Map<String, ComplexNestedObject1> theMap = new HashMap<String, ComplexNestedObject1>();
+
+		public ComplexNestedObject2() {
+		}
+
+		public ComplexNestedObject2(Random rnd) {
+			this.longValue = rnd.nextLong();
+
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject2.class) {
+				ComplexNestedObject2 other = (ComplexNestedObject2) obj;
+				return other.longValue == this.longValue && this.theMap.equals(other.theMap);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class Book {
+
+		private long bookId;
+		private String title;
+		private long authorId;
+
+		public Book() {
+		}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == Book.class) {
+				Book other = (Book) obj;
+				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class BookAuthor {
+
+		private long authorId;
+		private List<String> bookTitles;
+		private String authorName;
+
+		public BookAuthor() {
+		}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == BookAuthor.class) {
+				BookAuthor other = (BookAuthor) obj;
+				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+						other.bookTitles.equals(this.bookTitles);
+			} else {
+				return false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
new file mode 100644
index 0000000..50b4e06
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
+	@Override
+	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
+		return new AvroSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
new file mode 100644
index 0000000..587e7c4
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new AvroSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
new file mode 100644
index 0000000..9f4d482
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new AvroSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
deleted file mode 100644
index af31684..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericArraySerializerTest.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.lang.reflect.Array;
-import java.util.ArrayList;
-import java.util.Random;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericArraySerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.Book;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.BookAuthor;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject1;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject2;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.SimpleTypes;
-import org.apache.flink.util.StringUtils;
-
-public class GenericArraySerializerTest {
-	
-	private final Random rnd = new Random(349712539451944123L);
-	
-	
-	@Test
-	public void testString() {
-		String[] arr1 = new String[] {"abc", "",
-				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
-				StringUtils.getRandomString(new Random(289347567856686223L), 15, 50),
-				StringUtils.getRandomString(new Random(289347567856686223L), 30, 170),
-				StringUtils.getRandomString(new Random(289347567856686223L), 14, 15),
-				""};
-		
-		String[] arr2 = new String[] {"foo", "",
-				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
-				StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
-				StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
-				StringUtils.getRandomString(new Random(289347567856686223L), 100*1024, 105*1024),
-				"bar"};
-		
-		// run tests with the string serializer as the component serializer
-		runTests(String.class, new StringSerializer(), arr1, arr2);
-		
-		// run the tests with the generic serializer as the component serializer
-		runTests(arr1, arr2);
-	}
-	
-	@Test
-	public void testSimpleTypesObjects() {
-		SimpleTypes a = new SimpleTypes();
-		SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		
-		runTests(new SimpleTypes[] {a, b, c}, new SimpleTypes[] {d, e, f, g});
-	}
-	
-	@Test
-	public void testCompositeObject() {
-		ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
-		ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
-		ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
-		ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
-		ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-		
-		runTests(new ComplexNestedObject1[] {o1, o2}, new ComplexNestedObject1[] {o3}, new ComplexNestedObject1[] {o4, o5});
-	}
-	
-	@Test
-	public void testNestedObjects() {
-		ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 o2 = new ComplexNestedObject2();
-		ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
-		
-		runTests(	new ComplexNestedObject2[] {o1, o2, o3},
-					new ComplexNestedObject2[] {},
-					new ComplexNestedObject2[] {},
-					new ComplexNestedObject2[] {o4},
-					new ComplexNestedObject2[] {});
-	}
-	
-	@Test
-	public void testBeanStyleObjects() {
-		{
-			Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
-			Book b2 = new Book(0L, "Debugging byte streams", 1337);
-			Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-			Book b4 = new Book(Long.MAX_VALUE, "The joy of bits and bytes", 0xDEADBEEF);
-			Book b5 = new Book(Long.MIN_VALUE, "Winnign a prize for creative test strings", 0xBADF00);
-			Book b6 = new Book(-2L, "Distributed Systems", 0xABCDEF0123456789L);
-			
-			runTests(	new Book[] {b1, b2},
-						new Book[] {},
-						new Book[] {},
-						new Book[] {},
-						new Book[] {},
-						new Book[] {b3, b4, b5, b6});
-		}
-		
-		// object with collection
-		{
-			ArrayList<String> list = new ArrayList<String>();
-			list.add("A");
-			list.add("B");
-			list.add("C");
-			list.add("D");
-			list.add("E");
-			
-			BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
-			
-			ArrayList<String> list2 = new ArrayList<String>();
-			BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
-			
-			runTests(new BookAuthor[] {b1, b2});
-		}
-	}
-
-	private final <T> void runTests(T[]... instances) {
-		try {
-			@SuppressWarnings("unchecked")
-			Class<T> type = (Class<T>) instances[0][0].getClass();
-			TypeSerializer<T> serializer = new AvroSerializer<T>(type);
-			runTests(type, serializer, instances);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	private final <T> void runTests(Class<T> type, TypeSerializer<T> componentSerializer, T[]... instances) {
-		try {
-			if (type == null || componentSerializer == null || instances == null || instances.length == 0) {
-				throw new IllegalArgumentException();
-			}
-			
-			@SuppressWarnings("unchecked")
-			Class<T[]> arrayClass = (Class<T[]>) Array.newInstance(type, 0).getClass();
-			
-			GenericArraySerializer<T> serializer = createSerializer(type, componentSerializer);
-			SerializerTestInstance<T[]> test = new SerializerTestInstance<T[]>(serializer, arrayClass, -1, instances);
-			test.testAll();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	private final <T> GenericArraySerializer<T> createSerializer(Class<T> type, TypeSerializer<T> componentSerializer) {
-		return new GenericArraySerializer<T>(type, componentSerializer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
deleted file mode 100644
index 154a06f..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparatorTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class GenericTypeComparatorTest {
-
-	@Test
-	public void testString() {
-		runTests(new String[]{
-				"",
-				"Lorem Ipsum Dolor Omit Longer",
-				"aaaa",
-				"abcd",
-				"abce",
-				"abdd",
-				"accd",
-				"bbcd"
-		});
-	}
-
-	@Test
-	public void testSimpleTypesObjects() {
-		runTests(
-				new SimpleTypes(0, 1, (byte) 2, "", (short) 3, 4.0),
-				new SimpleTypes(1, 1, (byte) 2, "", (short) 3, 4.0),
-				new SimpleTypes(1, 2, (byte) 2, "", (short) 3, 4.0),
-				new SimpleTypes(1, 2, (byte) 3, "", (short) 3, 4.0),
-				new SimpleTypes(1, 2, (byte) 3, "a", (short) 3, 4.0),
-				new SimpleTypes(1, 2, (byte) 3, "b", (short) 3, 4.0),
-				new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 4.0),
-				new SimpleTypes(1, 2, (byte) 3, "b", (short) 4, 6.0)
-		);
-	}
-
-	@Test
-	public void testCompositeObject() {
-		ComplexNestedObject1 o1 = new ComplexNestedObject1(-1100);
-		ComplexNestedObject1 o2 = new ComplexNestedObject1(0);
-		ComplexNestedObject1 o3 = new ComplexNestedObject1(44);
-		ComplexNestedObject1 o4 = new ComplexNestedObject1(76923, "A");
-		ComplexNestedObject1 o5 = new ComplexNestedObject1(5626435, "A somewhat random collection");
-
-		runTests(o1, o2, o3, o4, o5);
-	}
-
-	@Test
-	public void testBeanStyleObjects() {
-		{
-			Book b111 = new Book(-1L, "A Low level interfaces", 0xC);
-			Book b122 = new Book(-1L, "Low level interfaces", 0xC);
-			Book b123 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-
-			Book b2 = new Book(0L, "Debugging byte streams", 1337);
-			Book b3 = new Book(976243875L, "The Serialization Odysse", 42);
-
-			runTests(b111, b122, b123, b2, b3);
-		}
-
-		{
-			BookAuthor b1 = new BookAuthor(976243875L, new ArrayList<String>(), "Arno Nym");
-
-			ArrayList<String> list = new ArrayList<String>();
-			list.add("A");
-			list.add("B");
-			list.add("C");
-			list.add("D");
-			list.add("E");
-
-			BookAuthor b2 = new BookAuthor(976243875L, list, "The Saurus");
-
-			runTests(b1, b2);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private final <T> void runTests(T... sortedTestData) {
-		ComparatorTestInstance<T> testBase = new ComparatorTestInstance<T>(sortedTestData);
-		testBase.testAll();
-	}
-
-	private static final <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
-	}
-
-	// ------------------------------------------------------------------------
-	// test instance
-	// ------------------------------------------------------------------------
-
-	private class ComparatorTestInstance<T> extends ComparatorTestBase<T> {
-
-		private final T[] testData;
-
-		private final Class<T> type;
-
-		@SuppressWarnings("unchecked")
-		public ComparatorTestInstance(T[] testData) {
-			if (testData == null || testData.length == 0) {
-				throw new IllegalArgumentException();
-			}
-
-			this.testData = testData;
-			this.type = (Class<T>) testData[0].getClass();
-		}
-
-		@Override
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		protected TypeComparator<T> createComparator(boolean ascending) {
-			return new GenericTypeComparator(ascending, GenericTypeComparatorTest.createSerializer(this.type), this.type);
-		}
-
-		@Override
-		protected TypeSerializer<T> createSerializer() {
-			return GenericTypeComparatorTest.createSerializer(this.type);
-		}
-
-		@Override
-		protected T[] getSortedTestData() {
-			return this.testData;
-		}
-
-		public void testAll() {
-			testDuplicate();
-			testEquality();
-			testEqualityWithReference();
-			testInequality();
-			testInequalityWithReference();
-			testNormalizedKeysEqualsFullLength();
-			testNormalizedKeysEqualsHalfLength();
-			testNormalizedKeysGreatSmallFullLength();
-			testNormalizedKeysGreatSmallAscDescHalfLength();
-			testNormalizedKeyReadWriter();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  test objects
-	// ------------------------------------------------------------------------
-
-	public static final class SimpleTypes implements Comparable<SimpleTypes> {
-
-		private final int iVal;
-		private final long lVal;
-		private final byte bVal;
-		private final String sVal;
-		private final short rVal;
-		private final double dVal;
-
-		public SimpleTypes() {
-			this(0, 0, (byte) 0, "", (short) 0, 0);
-		}
-
-		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
-			this.iVal = iVal;
-			this.lVal = lVal;
-			this.bVal = bVal;
-			this.sVal = sVal;
-			this.rVal = rVal;
-			this.dVal = dVal;
-		}
-
-		@Override
-		public String toString() {
-			return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == SimpleTypes.class) {
-				SimpleTypes other = (SimpleTypes) obj;
-
-				return other.iVal == this.iVal &&
-						other.lVal == this.lVal &&
-						other.bVal == this.bVal &&
-						other.sVal.equals(this.sVal) &&
-						other.rVal == this.rVal &&
-						other.dVal == this.dVal;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int compareTo(SimpleTypes o) {
-			int cmp = (this.iVal < o.iVal ? -1 : (this.iVal == o.iVal ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			cmp = (this.lVal < o.lVal ? -1 : (this.lVal == o.lVal ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			cmp = (this.bVal < o.bVal ? -1 : (this.bVal == o.bVal ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			cmp = this.sVal.compareTo(o.sVal);
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			cmp = (this.rVal < o.rVal ? -1 : (this.rVal == o.rVal ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			return (this.dVal < o.dVal ? -1 : (this.dVal == o.dVal ? 0 : 1));
-		}
-	}
-
-	public static class ComplexNestedObject1 implements Comparable<ComplexNestedObject1> {
-
-		private double doubleValue;
-
-		private List<String> stringList;
-
-		public ComplexNestedObject1() {
-		}
-
-		public ComplexNestedObject1(double value, String... listElements) {
-			this.doubleValue = value;
-
-			this.stringList = new ArrayList<String>();
-			for (String str : listElements) {
-				this.stringList.add(str);
-			}
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == ComplexNestedObject1.class) {
-				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
-				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int compareTo(ComplexNestedObject1 o) {
-			int cmp = (this.doubleValue < o.doubleValue ? -1 : (this.doubleValue == o.doubleValue ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			int size = this.stringList.size();
-			int otherSize = o.stringList.size();
-
-			cmp = (size < otherSize ? -1 : (size == otherSize ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			for (int i = 0; i < size; i++) {
-				cmp = this.stringList.get(i).compareTo(o.stringList.get(i));
-				if (cmp != 0) {
-					return cmp;
-				}
-			}
-
-			return 0;
-		}
-	}
-
-	public static class Book implements Comparable<Book> {
-
-		private long bookId;
-		private String title;
-		private long authorId;
-
-		public Book() {
-		}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == Book.class) {
-				Book other = (Book) obj;
-				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int compareTo(Book o) {
-			int cmp = (this.bookId < o.bookId ? -1 : (this.bookId == o.bookId ? 0 : 1));
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			cmp = title.compareTo(o.title);
-			if (cmp != 0) {
-				return cmp;
-			}
-
-			return (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
-		}
-	}
-
-	public static class BookAuthor implements Comparable<BookAuthor> {
-
-		private long authorId;
-		private List<String> bookTitles;
-		private String authorName;
-
-		public BookAuthor() {
-		}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == BookAuthor.class) {
-				BookAuthor other = (BookAuthor) obj;
-				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
-						other.bookTitles.equals(this.bookTitles);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int compareTo(BookAuthor o) {
-			int cmp = (this.authorId < o.authorId ? -1 : (this.authorId == o.authorId ? 0 : 1));
-			if (cmp != 0) return cmp;
-
-			int size = this.bookTitles.size();
-			int oSize = o.bookTitles.size();
-			cmp = (size < oSize ? -1 : (size == oSize ? 0 : 1));
-			if (cmp != 0) return cmp;
-
-			for (int i = 0; i < size; i++) {
-				cmp = this.bookTitles.get(i).compareTo(o.bookTitles.get(i));
-				if (cmp != 0) return cmp;
-			}
-
-			return this.authorName.compareTo(o.authorName);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
deleted file mode 100644
index 84622f0..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeSerializerTest.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * A test for the {@link AvroSerializer}.
- */
-public class GenericTypeSerializerTest {
-
-	private final Random rnd = new Random(349712539451944123L);
-
-
-	@Test
-	public void testString() {
-		runTests("abc", "",
-				StringUtils.getRandomString(new Random(289347567856686223L), 10, 100),
-				StringUtils.getRandomString(new Random(289347567856686223L), 1000, 5000),
-				StringUtils.getRandomString(new Random(289347567856686223L), 30000, 35000),
-				StringUtils.getRandomString(new Random(289347567856686223L), 100 * 1024, 105 * 1024));
-	}
-
-	@Test
-	public void testSimpleTypesObjects() {
-		SimpleTypes a = new SimpleTypes();
-		SimpleTypes b = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes c = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes d = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes e = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes f = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-		SimpleTypes g = new SimpleTypes(rnd.nextInt(), rnd.nextLong(), (byte) rnd.nextInt(),
-				StringUtils.getRandomString(rnd, 10, 100), (short) rnd.nextInt(), rnd.nextDouble());
-
-		runTests(a, b, c, d, e, f, g);
-	}
-
-	@Test
-	public void testCompositeObject() {
-		ComplexNestedObject1 o1 = new ComplexNestedObject1(5626435);
-		ComplexNestedObject1 o2 = new ComplexNestedObject1(76923);
-		ComplexNestedObject1 o3 = new ComplexNestedObject1(-1100);
-		ComplexNestedObject1 o4 = new ComplexNestedObject1(0);
-		ComplexNestedObject1 o5 = new ComplexNestedObject1(44);
-
-		runTests(o1, o2, o3, o4, o5);
-	}
-
-	@Test
-	public void testNestedObjects() {
-		ComplexNestedObject2 o1 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 o2 = new ComplexNestedObject2();
-		ComplexNestedObject2 o3 = new ComplexNestedObject2(rnd);
-		ComplexNestedObject2 o4 = new ComplexNestedObject2(rnd);
-
-		runTests(o1, o2, o3, o4);
-	}
-
-	@Test
-	public void testBeanStyleObjects() {
-		{
-			Book b1 = new Book(976243875L, "The Serialization Odysse", 42);
-			Book b2 = new Book(0L, "Debugging byte streams", 1337);
-			Book b3 = new Book(-1L, "Low level interfaces", 0xC0FFEE);
-
-			runTests(b1, b2, b3);
-		}
-
-		// object with collection
-		{
-			ArrayList<String> list = new ArrayList<String>();
-			list.add("A");
-			list.add("B");
-			list.add("C");
-			list.add("D");
-			list.add("E");
-
-			BookAuthor b1 = new BookAuthor(976243875L, list, "Arno Nym");
-
-			ArrayList<String> list2 = new ArrayList<String>();
-			BookAuthor b2 = new BookAuthor(987654321L, list2, "The Saurus");
-
-			runTests(b1, b2);
-		}
-	}
-
-	private final <T> void runTests(T... instances) {
-		if (instances == null || instances.length == 0) {
-			throw new IllegalArgumentException();
-		}
-
-		@SuppressWarnings("unchecked")
-		Class<T> clazz = (Class<T>) instances[0].getClass();
-
-		AvroSerializer<T> serializer = createSerializer(clazz);
-		SerializerTestInstance<T> test = new SerializerTestInstance<T>(serializer, clazz, -1, instances);
-		test.testAll();
-	}
-
-	private final <T> AvroSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Test Objects
-	// --------------------------------------------------------------------------------------------
-
-
-	public static final class SimpleTypes {
-
-		private final int iVal;
-		private final long lVal;
-		private final byte bVal;
-		private final String sVal;
-		private final short rVal;
-		private final double dVal;
-
-
-		public SimpleTypes() {
-			this(0, 0, (byte) 0, "", (short) 0, 0);
-		}
-
-		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
-			this.iVal = iVal;
-			this.lVal = lVal;
-			this.bVal = bVal;
-			this.sVal = sVal;
-			this.rVal = rVal;
-			this.dVal = dVal;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == SimpleTypes.class) {
-				SimpleTypes other = (SimpleTypes) obj;
-
-				return other.iVal == this.iVal &&
-						other.lVal == this.lVal &&
-						other.bVal == this.bVal &&
-						other.sVal.equals(this.sVal) &&
-						other.rVal == this.rVal &&
-						other.dVal == this.dVal;
-
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public String toString() {
-			return String.format("(%d, %d, %d, %s, %d, %f)", iVal, lVal, bVal, sVal, rVal, dVal);
-		}
-	}
-
-	public static class ComplexNestedObject1 {
-
-		private double doubleValue;
-
-		private List<String> stringList;
-
-		public ComplexNestedObject1() {
-		}
-
-		public ComplexNestedObject1(int offInit) {
-			this.doubleValue = 6293485.6723 + offInit;
-
-			this.stringList = new ArrayList<String>();
-			this.stringList.add("A" + offInit);
-			this.stringList.add("somewhat" + offInit);
-			this.stringList.add("random" + offInit);
-			this.stringList.add("collection" + offInit);
-			this.stringList.add("of" + offInit);
-			this.stringList.add("strings" + offInit);
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == ComplexNestedObject1.class) {
-				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
-				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class ComplexNestedObject2 {
-
-		private long longValue;
-
-		private Map<String, ComplexNestedObject1> theMap = new HashMap<String, ComplexNestedObject1>();
-
-		public ComplexNestedObject2() {
-		}
-
-		public ComplexNestedObject2(Random rnd) {
-			this.longValue = rnd.nextLong();
-
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-			this.theMap.put(String.valueOf(rnd.nextLong()), new ComplexNestedObject1(rnd.nextInt()));
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == ComplexNestedObject2.class) {
-				ComplexNestedObject2 other = (ComplexNestedObject2) obj;
-				return other.longValue == this.longValue && this.theMap.equals(other.theMap);
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class Book {
-
-		private long bookId;
-		private String title;
-		private long authorId;
-
-		public Book() {
-		}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == Book.class) {
-				Book other = (Book) obj;
-				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class BookAuthor {
-
-		private long authorId;
-		private List<String> bookTitles;
-		private String authorName;
-
-		public BookAuthor() {
-		}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == BookAuthor.class) {
-				BookAuthor other = (BookAuthor) obj;
-				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
-						other.bookTitles.equals(this.bookTitles);
-			} else {
-				return false;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
new file mode 100644
index 0000000..b7fe093
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericArraySerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
+	@Override
+	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
new file mode 100644
index 0000000..f87d982
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
new file mode 100644
index 0000000..c4f2a65
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+	@Override
+	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
+		return new KryoSerializer<T>(type);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22203e75/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
index fa7653f..3850690 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -26,14 +26,13 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.Book;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.BookAuthor;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject1;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.ComplexNestedObject2;
-import org.apache.flink.api.java.typeutils.runtime.GenericTypeSerializerTest.SimpleTypes;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.Book;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.BookAuthor;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject1;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.ComplexNestedObject2;
+import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest.SimpleTypes;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;


[2/2] git commit: Default generic serializer is Avro. Minor improvements in KryoSerializer. Add mini benchmark for Avro/Kryo.

Posted by se...@apache.org.
Default generic serializer is Avro. Minor improvements in KryoSerializer. Add mini benchmark for Avro/Kryo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dac281f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dac281f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dac281f4

Branch: refs/heads/master
Commit: dac281f407a15020620ca74bc56303b3a4c3c850
Parents: 22203e7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 2 17:56:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 18:26:20 2014 +0200

----------------------------------------------------------------------
 .../api/java/typeutils/GenericTypeInfo.java     |   4 +-
 .../java/typeutils/runtime/KryoSerializer.java  |  30 +-
 .../java/typeutils/runtime/TupleComparator.java |   1 +
 .../runtime/KryoVersusAvroMinibenchmark.java    | 767 +++++++++++++++++++
 4 files changed, 791 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index f7f78c1..cbc36ed 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 import org.apache.flink.types.TypeInformation;
 
 
@@ -64,7 +64,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public TypeSerializer<T> createSerializer() {
-		return new KryoSerializer<T>(this.typeClass);
+		return new AvroSerializer<T>(this.typeClass);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index ad7f83a..aa1e98e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -34,8 +34,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	private final Class<? extends T> typeToInstantiate;
 
 	private transient Kryo kryo;
-	private transient T copyInstance = null;
-	private transient Input in = null;
+	private transient T copyInstance;
+	
+	private transient DataOutputView previousOut;
+	private transient DataInputView previousIn;
+	
+	private transient Input input;
+	private transient Output output;
 
 	public KryoSerializer(Class<T> type){
 		this(type,type);
@@ -84,18 +89,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	@Override
 	public void serialize(T record, DataOutputView target) throws IOException {
 		checkKryoInitialized();
-		DataOutputViewStream outputStream = new DataOutputViewStream(target);
-		Output out = new Output(outputStream);
-		kryo.writeObject(out, record);
-		out.flush();
+		if (target != previousOut) {
+			DataOutputViewStream outputStream = new DataOutputViewStream(target);
+			output = new Output(outputStream);
+			previousOut = target;
+		}
+		
+		kryo.writeObject(output, record);
+		output.flush();
 	}
 
 	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		checkKryoInitialized();
-		DataInputViewStream inputStream = new DataInputViewStream(source);
-		Input in = new NoFetchingInput(inputStream);
-		reuse = kryo.readObject(in, typeToInstantiate);
+		if (source != previousIn) {
+			DataInputViewStream inputStream = new DataInputViewStream(source);
+			input = new NoFetchingInput(inputStream);
+			previousIn = source;
+		}
+		reuse = kryo.readObject(input, typeToInstantiate);
 		return reuse;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index c786345..e7dd25a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -235,6 +235,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
 		try {
 			for (; i < keyPositions.length; i++) {
 				int keyPos = keyPositions[i];
+				@SuppressWarnings("unchecked")
 				int cmp = comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
 				if (cmp != 0) {
 					return cmp;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dac281f4/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
new file mode 100644
index 0000000..52c17d6
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoVersusAvroMinibenchmark.java
@@ -0,0 +1,767 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+public class KryoVersusAvroMinibenchmark {
+
+	private static final long SEED = 94762389741692387L;
+	
+	private static final Random rnd = new Random(SEED);
+	
+	private static final int NUM_ELEMENTS = 100000;
+	
+	private static final int NUM_RUNS = 10;
+	
+	
+	
+	public static void main(String[] args) throws Exception {
+		
+		final MyType[] elements = new MyType[NUM_ELEMENTS];
+		for (int i = 0; i < NUM_ELEMENTS; i++) {
+			elements[i] = MyType.getRandom();
+		}
+		
+		final MyType dummy = new MyType();
+		
+		long[] timesAvro = new long[NUM_RUNS];
+		long[] timesKryo = new long[NUM_RUNS];
+		
+		for (int i = 0; i < NUM_RUNS; i++) {
+			System.out.println("----------------- Starting run " + i + " ---------------------");
+			
+			System.out.println("Avro serializer");
+			{
+				final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+				final AvroSerializer<MyType> serializer = new AvroSerializer<MyType>(MyType.class);
+				
+				long start = System.nanoTime();
+				
+				for (int k = 0; k < NUM_ELEMENTS; k++) {
+					serializer.serialize(elements[k], outView);
+				}
+				
+				final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+				for (int k = 0; k < NUM_ELEMENTS; k++) {
+					serializer.deserialize(dummy, inView);
+				}
+				
+				long elapsed = System.nanoTime() - start;
+				System.out.println("Took: " + (elapsed / 1000000) + " msecs");
+				timesAvro[i] = elapsed;
+			}
+			
+			System.gc();
+			
+			System.out.println("Kryo serializer");
+			{
+				final DataOutputSerializer outView = new DataOutputSerializer(100000000);
+				final KryoSerializer<MyType> serializer = new KryoSerializer<MyType>(MyType.class);
+				
+				long start = System.nanoTime();
+				
+				for (int k = 0; k < NUM_ELEMENTS; k++) {
+					serializer.serialize(elements[k], outView);
+				}
+				
+				final DataInputDeserializer inView = new DataInputDeserializer(outView.wrapAsByteBuffer());
+				for (int k = 0; k < NUM_ELEMENTS; k++) {
+					serializer.deserialize(dummy, inView);
+				}
+				
+				long elapsed = System.nanoTime() - start;
+				System.out.println("Took: " + (elapsed / 1000000) + " msecs");
+				timesKryo[i] = elapsed;
+			}
+		}
+	}
+	
+	
+	
+	
+	
+	public static class MyType {
+		
+		private String theString;
+		
+//		private Tuple2<Long, Double> theTuple;
+		
+		private List<Integer> theList;
+
+		
+		public MyType() {
+			theString = "";
+//			theTuple = new Tuple2<Long, Double>(0L, 0.0);
+			theList = new ArrayList<Integer>();
+		}
+		
+		public MyType(String theString, Tuple2<Long, Double> theTuple, List<Integer> theList) {
+			this.theString = theString;
+//			this.theTuple = theTuple;
+			this.theList = theList;
+		}
+
+		
+		public String getTheString() {
+			return theString;
+		}
+
+		public void setTheString(String theString) {
+			this.theString = theString;
+		}
+
+//		public Tuple2<Long, Double> getTheTuple() {
+//			return theTuple;
+//		}
+//
+//		public void setTheTuple(Tuple2<Long, Double> theTuple) {
+//			this.theTuple = theTuple;
+//		}
+
+		public List<Integer> getTheList() {
+			return theList;
+		}
+
+		public void setTheList(List<Integer> theList) {
+			this.theList = theList;
+		}
+		
+		
+		public static MyType getRandom() {
+			final int numListElements = rnd.nextInt(20);
+			List<Integer> list = new ArrayList<Integer>(numListElements);
+			for (int i = 0; i < numListElements; i++) {
+				list.add(rnd.nextInt());
+			}
+			
+			return new MyType(randomString(), new Tuple2<Long, Double>(rnd.nextLong(), rnd.nextDouble()), list);
+		}
+	}
+	
+	
+	private static String randomString() {
+		final int len = rnd.nextInt(100) + 20;
+		
+		StringBuilder bld = new StringBuilder();
+		for (int i = 0; i < len; i++) {
+			bld.append(rnd.nextInt('z' - 'a' + 1) + 'a');
+		}
+		return bld.toString();
+	}
+	
+	// ============================================================================================
+	// ============================================================================================
+	
+	public static final class DataOutputSerializer implements DataOutputView {
+		
+		private byte[] buffer;
+		
+		private int position;
+
+		private ByteBuffer wrapper;
+		
+		public DataOutputSerializer(int startSize) {
+			if (startSize < 1) {
+				throw new IllegalArgumentException();
+			}
+
+			this.buffer = new byte[startSize];
+			this.wrapper = ByteBuffer.wrap(buffer);
+		}
+		
+		public ByteBuffer wrapAsByteBuffer() {
+			this.wrapper.position(0);
+			this.wrapper.limit(this.position);
+			return this.wrapper;
+		}
+
+		public void clear() {
+			this.position = 0;
+		}
+
+		public int length() {
+			return this.position;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
+		}
+
+		// ----------------------------------------------------------------------------------------
+		//                               Data Output
+		// ----------------------------------------------------------------------------------------
+		
+		@Override
+		public void write(int b) throws IOException {
+			if (this.position >= this.buffer.length) {
+				resize(1);
+			}
+			this.buffer[this.position++] = (byte) (b & 0xff);
+		}
+
+		@Override
+		public void write(byte[] b) throws IOException {
+			write(b, 0, b.length);
+		}
+
+		@Override
+		public void write(byte[] b, int off, int len) throws IOException {
+			if (len < 0 || off > b.length - len) {
+				throw new ArrayIndexOutOfBoundsException();
+			}
+			if (this.position > this.buffer.length - len) {
+				resize(len);
+			}
+			System.arraycopy(b, off, this.buffer, this.position, len);
+			this.position += len;
+		}
+
+		@Override
+		public void writeBoolean(boolean v) throws IOException {
+			write(v ? 1 : 0);
+		}
+
+		@Override
+		public void writeByte(int v) throws IOException {
+			write(v);
+		}
+
+		@Override
+		public void writeBytes(String s) throws IOException {
+			final int sLen = s.length();
+			if (this.position >= this.buffer.length - sLen) {
+				resize(sLen);
+			}
+			
+			for (int i = 0; i < sLen; i++) {
+				writeByte(s.charAt(i));
+			}
+			this.position += sLen;
+		}
+
+		@Override
+		public void writeChar(int v) throws IOException {
+			if (this.position >= this.buffer.length - 1) {
+				resize(2);
+			}
+			this.buffer[this.position++] = (byte) (v >> 8);
+			this.buffer[this.position++] = (byte) v;
+		}
+
+		@Override
+		public void writeChars(String s) throws IOException {
+			final int sLen = s.length();
+			if (this.position >= this.buffer.length - 2*sLen) {
+				resize(2*sLen);
+			} 
+			for (int i = 0; i < sLen; i++) {
+				writeChar(s.charAt(i));
+			}
+		}
+
+		@Override
+		public void writeDouble(double v) throws IOException {
+			writeLong(Double.doubleToLongBits(v));
+		}
+
+		@Override
+		public void writeFloat(float v) throws IOException {
+			writeInt(Float.floatToIntBits(v));
+		}
+
+		@SuppressWarnings("restriction")
+		@Override
+		public void writeInt(int v) throws IOException {
+			if (this.position >= this.buffer.length - 3) {
+				resize(4);
+			}
+			if (LITTLE_ENDIAN) {
+				v = Integer.reverseBytes(v);
+			}			
+			UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+			this.position += 4;
+		}
+
+		@SuppressWarnings("restriction")
+		@Override
+		public void writeLong(long v) throws IOException {
+			if (this.position >= this.buffer.length - 7) {
+				resize(8);
+			}
+			if (LITTLE_ENDIAN) {
+				v = Long.reverseBytes(v);
+			}
+			UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+			this.position += 8;
+		}
+
+		@Override
+		public void writeShort(int v) throws IOException {
+			if (this.position >= this.buffer.length - 1) {
+				resize(2);
+			}
+			this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+			this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+		}
+
+		@Override
+		public void writeUTF(String str) throws IOException {
+			int strlen = str.length();
+			int utflen = 0;
+			int c;
+
+			/* use charAt instead of copying String to char array */
+			for (int i = 0; i < strlen; i++) {
+				c = str.charAt(i);
+				if ((c >= 0x0001) && (c <= 0x007F)) {
+					utflen++;
+				} else if (c > 0x07FF) {
+					utflen += 3;
+				} else {
+					utflen += 2;
+				}
+			}
+
+			if (utflen > 65535) {
+				throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+			}
+			else if (this.position > this.buffer.length - utflen - 2) {
+				resize(utflen + 2);
+			}
+			
+			byte[] bytearr = this.buffer;
+			int count = this.position;
+
+			bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+			bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+			int i = 0;
+			for (i = 0; i < strlen; i++) {
+				c = str.charAt(i);
+				if (!((c >= 0x0001) && (c <= 0x007F))) {
+					break;
+				}
+				bytearr[count++] = (byte) c;
+			}
+
+			for (; i < strlen; i++) {
+				c = str.charAt(i);
+				if ((c >= 0x0001) && (c <= 0x007F)) {
+					bytearr[count++] = (byte) c;
+
+				} else if (c > 0x07FF) {
+					bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+					bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+					bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+				} else {
+					bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+					bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+				}
+			}
+
+			this.position = count;
+		}
+		
+		
+		private final void resize(int minCapacityAdd) throws IOException {
+			try {
+				final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
+				final byte[] nb = new byte[newLen];
+				System.arraycopy(this.buffer, 0, nb, 0, this.position);
+				this.buffer = nb;
+				this.wrapper = ByteBuffer.wrap(this.buffer);
+			}
+			catch (NegativeArraySizeException nasex) {
+				throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
+			}
+		}
+		
+		@SuppressWarnings("restriction")
+		private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+		
+		@SuppressWarnings("restriction")
+		private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+		
+		private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+		@Override
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			if(buffer.length - this.position < numBytes){
+				throw new EOFException("Could not skip " + numBytes + " bytes.");
+			}
+
+			this.position += numBytes;
+		}
+
+		@Override
+		public void write(DataInputView source, int numBytes) throws IOException {
+			if(buffer.length - this.position < numBytes){
+				throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow.");
+			}
+
+			source.read(this.buffer, this.position, numBytes);
+			this.position += numBytes;
+		}
+		
+	}
+	
+	public static final class DataInputDeserializer implements DataInputView {
+		
+		private byte[] buffer;
+		
+		private int end;
+
+		private int position;
+
+		public DataInputDeserializer() {
+		}
+		
+		public DataInputDeserializer(byte[] buffer, int start, int len) {
+			setBuffer(buffer, start, len);
+		}
+		
+		public DataInputDeserializer(ByteBuffer buffer) {
+			setBuffer(buffer);
+		}
+
+		public void setBuffer(ByteBuffer buffer) {
+			if (buffer.hasArray()) {
+				this.buffer = buffer.array();
+				this.position = buffer.arrayOffset() + buffer.position();
+				this.end = this.position + buffer.remaining();
+			} else if (buffer.isDirect()) {
+				this.buffer = new byte[buffer.remaining()];
+				this.position = 0;
+				this.end = this.buffer.length;
+
+				buffer.get(this.buffer);
+			} else {
+				throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+			}
+		}
+
+		public void setBuffer(byte[] buffer, int start, int len) {
+			if (buffer == null) {
+				throw new NullPointerException();
+			}
+
+			if (start < 0 || len < 0 || start + len >= buffer.length) {
+				throw new IllegalArgumentException();
+			}
+
+			this.buffer = buffer;
+			this.position = start;
+			this.end = start * len;
+		}
+
+		// ----------------------------------------------------------------------------------------
+		//                               Data Input
+		// ----------------------------------------------------------------------------------------
+		
+		@Override
+		public boolean readBoolean() throws IOException {
+			if (this.position < this.end) {
+				return this.buffer[this.position++] != 0;
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public byte readByte() throws IOException {
+			if (this.position < this.end) {
+				return this.buffer[this.position++];
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public char readChar() throws IOException {
+			if (this.position < this.end - 1) {
+				return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public double readDouble() throws IOException {
+			return Double.longBitsToDouble(readLong());
+		}
+
+		@Override
+		public float readFloat() throws IOException {
+			return Float.intBitsToFloat(readInt());
+		}
+
+		@Override
+		public void readFully(byte[] b) throws IOException {
+			readFully(b, 0, b.length);
+		}
+
+		@Override
+		public void readFully(byte[] b, int off, int len) throws IOException {
+			if (len >= 0) {
+				if (off <= b.length - len) {
+					if (this.position <= this.end - len) {
+						System.arraycopy(this.buffer, position, b, off, len);
+						position += len;
+					} else {
+						throw new EOFException();
+					}
+				} else {
+					throw new ArrayIndexOutOfBoundsException();
+				}
+			} else if (len < 0) {
+				throw new IllegalArgumentException("Length may not be negative.");
+			}
+		}
+
+		@Override
+		public int readInt() throws IOException {
+			if (this.position >= 0 && this.position < this.end - 3) {
+				@SuppressWarnings("restriction")
+				int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+				if (LITTLE_ENDIAN) {
+					value = Integer.reverseBytes(value);
+				}
+				
+				this.position += 4;
+				return value;
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public String readLine() throws IOException {
+			if (this.position < this.end) {
+				// read until a newline is found
+				StringBuilder bld = new StringBuilder();
+				char curr = (char) readUnsignedByte();
+				while (position < this.end && curr != '\n') {
+					bld.append(curr);
+					curr = (char) readUnsignedByte();
+				}
+				// trim a trailing carriage return
+				int len = bld.length();
+				if (len > 0 && bld.charAt(len - 1) == '\r') {
+					bld.setLength(len - 1);
+				}
+				String s = bld.toString();
+				bld.setLength(0);
+				return s;
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public long readLong() throws IOException {
+			if (position >= 0 && position < this.end - 7) {
+				@SuppressWarnings("restriction")
+				long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+				if (LITTLE_ENDIAN) {
+					value = Long.reverseBytes(value);
+				}
+				this.position += 8;
+				return value;
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public short readShort() throws IOException {
+			if (position >= 0 && position < this.end - 1) {
+				return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public String readUTF() throws IOException {
+			int utflen = readUnsignedShort();
+			byte[] bytearr = new byte[utflen];
+			char[] chararr = new char[utflen];
+
+			int c, char2, char3;
+			int count = 0;
+			int chararr_count = 0;
+
+			readFully(bytearr, 0, utflen);
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				if (c > 127) {
+					break;
+				}
+				count++;
+				chararr[chararr_count++] = (char) c;
+			}
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				switch (c >> 4) {
+				case 0:
+				case 1:
+				case 2:
+				case 3:
+				case 4:
+				case 5:
+				case 6:
+				case 7:
+					/* 0xxxxxxx */
+					count++;
+					chararr[chararr_count++] = (char) c;
+					break;
+				case 12:
+				case 13:
+					/* 110x xxxx 10xx xxxx */
+					count += 2;
+					if (count > utflen) {
+						throw new UTFDataFormatException("malformed input: partial character at end");
+					}
+					char2 = (int) bytearr[count - 1];
+					if ((char2 & 0xC0) != 0x80) {
+						throw new UTFDataFormatException("malformed input around byte " + count);
+					}
+					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+					break;
+				case 14:
+					/* 1110 xxxx 10xx xxxx 10xx xxxx */
+					count += 3;
+					if (count > utflen) {
+						throw new UTFDataFormatException("malformed input: partial character at end");
+					}
+					char2 = (int) bytearr[count - 2];
+					char3 = (int) bytearr[count - 1];
+					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+					}
+					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+					break;
+				default:
+					/* 10xx xxxx, 1111 xxxx */
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+			}
+			// The number of chars produced may be less than utflen
+			return new String(chararr, 0, chararr_count);
+		}
+
+		@Override
+		public int readUnsignedByte() throws IOException {
+			if (this.position < this.end) {
+				return (this.buffer[this.position++] & 0xff);
+			} else {
+				throw new EOFException();
+			}
+		}
+
+		@Override
+		public int readUnsignedShort() throws IOException {
+			if (this.position < this.end - 1) {
+				return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+			} else {
+				throw new EOFException();
+			}
+		}
+		
+		@Override
+		public int skipBytes(int n) throws IOException {
+			if (this.position <= this.end - n) {
+				this.position += n;
+				return n;
+			} else {
+				n = this.end - this.position;
+				this.position = this.end;
+				return n;
+			}
+		}
+		
+		@SuppressWarnings("restriction")
+		private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+		
+		@SuppressWarnings("restriction")
+		private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+		
+		private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			int skippedBytes = skipBytes(numBytes);
+
+			if(skippedBytes < numBytes){
+				throw new EOFException("Could not skip " + numBytes +" bytes.");
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if(b == null){
+				throw new NullPointerException("Byte array b cannot be null.");
+			}
+
+			if(off < 0){
+				throw new IndexOutOfBoundsException("Offset cannot be negative.");
+			}
+
+			if(len < 0){
+				throw new IndexOutOfBoundsException("Length cannot be negative.");
+			}
+
+			if(b.length - off < len){
+				throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+						".");
+			}
+
+			if(this.position >= this.end){
+				return -1;
+			}else{
+				int toRead = Math.min(this.end-this.position, len);
+				System.arraycopy(this.buffer,this.position,b,off,toRead);
+				this.position += toRead;
+
+				return toRead;
+			}
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return read(b, 0, b.length);
+		}
+	}
+}